001// Licensed under the Apache License, Version 2.0 (the "License"); 002// you may not use this file except in compliance with the License. 003// You may obtain a copy of the License at 004// 005// http://www.apache.org/licenses/LICENSE-2.0 006// 007// Unless required by applicable law or agreed to in writing, software 008// distributed under the License is distributed on an "AS IS" BASIS, 009// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 010// See the License for the specific language governing permissions and 011// limitations under the License. 012 013package org.apache.tapestry5.ioc.internal.services.cron; 014 015import org.apache.tapestry5.ioc.Invokable; 016import org.apache.tapestry5.ioc.annotations.PostInjection; 017import org.apache.tapestry5.ioc.internal.util.CollectionFactory; 018import org.apache.tapestry5.ioc.services.ParallelExecutor; 019import org.apache.tapestry5.ioc.services.RegistryShutdownHub; 020import org.apache.tapestry5.ioc.services.cron.PeriodicExecutor; 021import org.apache.tapestry5.ioc.services.cron.PeriodicJob; 022import org.apache.tapestry5.ioc.services.cron.Schedule; 023import org.slf4j.Logger; 024 025import java.util.List; 026import java.util.concurrent.atomic.AtomicInteger; 027import java.util.concurrent.locks.Lock; 028import java.util.concurrent.locks.ReentrantLock; 029 030public class PeriodicExecutorImpl implements PeriodicExecutor, Runnable 031{ 032 private final ParallelExecutor parallelExecutor; 033 034 private final Logger logger; 035 036 // Synchronized by jobLock 037 private final List<Job> jobs = CollectionFactory.newList(); 038 039 private final Thread thread = new Thread(this, "Tapestry PeriodicExecutor"); 040 041 private transient boolean shutdown; 042 043 private static final long FIVE_MINUTES = 5 * 60 * 1000; 044 045 private final AtomicInteger jobIdAllocator = new AtomicInteger(); 046 047 private final Lock jobLock = new ReentrantLock(); 048 049 private class Job implements PeriodicJob, Invokable<Void> 050 { 051 final int jobId = jobIdAllocator.incrementAndGet(); 052 053 private final Schedule schedule; 054 055 private final String name; 056 057 private final Runnable runnableJob; 058 059 private boolean executing, canceled; 060 061 private long nextExecution; 062 063 public Job(Schedule schedule, String name, Runnable runnableJob) 064 { 065 this.schedule = schedule; 066 this.name = name; 067 this.runnableJob = runnableJob; 068 069 nextExecution = schedule.firstExecution(); 070 } 071 072 @Override 073 public String getName() 074 { 075 return name; 076 } 077 078 public long getNextExecution() 079 { 080 try 081 { 082 jobLock.lock(); 083 return nextExecution; 084 } finally 085 { 086 jobLock.unlock(); 087 } 088 } 089 090 091 @Override 092 public boolean isExecuting() 093 { 094 try 095 { 096 jobLock.lock(); 097 return executing; 098 } finally 099 { 100 jobLock.unlock(); 101 } 102 } 103 104 @Override 105 public boolean isCanceled() 106 { 107 try 108 { 109 jobLock.lock(); 110 return canceled; 111 } finally 112 { 113 jobLock.unlock(); 114 } 115 } 116 117 @Override 118 public void cancel() 119 { 120 try 121 { 122 jobLock.lock(); 123 124 canceled = true; 125 126 if (!executing) 127 { 128 removeJob(this); 129 } 130 131 // Otherwise, it will be caught when the job finishes execution. 132 } finally 133 { 134 jobLock.unlock(); 135 } 136 } 137 138 @Override 139 public String toString() 140 { 141 StringBuilder builder = new StringBuilder("PeriodicJob[#").append(jobId); 142 143 builder.append(", (").append(name).append(")"); 144 145 if (executing) 146 { 147 builder.append(", executing"); 148 } 149 150 if (canceled) 151 { 152 builder.append(", canceled"); 153 } else 154 { 155 builder.append(String.format(", next execution %Tk:%<TM:%<TS+%<TL", nextExecution)); 156 } 157 158 return builder.append("]").toString(); 159 } 160 161 /** 162 * Starts execution of the job; this sets the executing flag, calculates the next execution time, 163 * and uses the ParallelExecutor to run the job. 164 */ 165 void start() 166 { 167 try 168 { 169 jobLock.lock(); 170 executing = true; 171 172 // This is a bit naive; it assumes there will not be a delay waiting to execute. There's a lot of options 173 // here, such as basing the next execution on the actual start time, or event actual completion time, or allowing 174 // overlapping executions of the Job on a more rigid schedule. Use Quartz. 175 176 nextExecution = schedule.nextExecution(nextExecution); 177 178 parallelExecutor.invoke(this); 179 } finally 180 { 181 jobLock.unlock(); 182 } 183 184 if (logger.isTraceEnabled()) 185 { 186 logger.trace(this + " sent for execution"); 187 } 188 } 189 190 void cleanupAfterExecution() 191 { 192 try 193 { 194 if (logger.isTraceEnabled()) 195 { 196 logger.trace(this + " execution complete"); 197 } 198 199 executing = false; 200 201 if (canceled) 202 { 203 removeJob(this); 204 } else 205 { 206 // Again, naive but necessary. 207 thread.interrupt(); 208 } 209 } finally 210 { 211 jobLock.unlock(); 212 } 213 } 214 215 @Override 216 public Void invoke() 217 { 218 if (logger.isDebugEnabled()) 219 { 220 logger.debug(String.format("Executing job #%d (%s)", jobId, name)); 221 } 222 223 try 224 { 225 runnableJob.run(); 226 } finally 227 { 228 cleanupAfterExecution(); 229 } 230 231 return null; 232 } 233 234 } 235 236 public PeriodicExecutorImpl(ParallelExecutor parallelExecutor, Logger logger) 237 { 238 this.parallelExecutor = parallelExecutor; 239 this.logger = logger; 240 } 241 242 @PostInjection 243 public void start(RegistryShutdownHub hub) 244 { 245 hub.addRegistryShutdownListener(new Runnable() 246 { 247 @Override 248 public void run() 249 { 250 registryDidShutdown(); 251 } 252 }); 253 254 thread.start(); 255 } 256 257 258 void removeJob(Job job) 259 { 260 if (logger.isDebugEnabled()) 261 { 262 logger.debug("Removing " + job); 263 } 264 265 try 266 { 267 jobLock.lock(); 268 jobs.remove(job); 269 } finally 270 { 271 jobLock.unlock(); 272 } 273 } 274 275 276 @Override 277 public PeriodicJob addJob(Schedule schedule, String name, Runnable job) 278 { 279 assert schedule != null; 280 assert name != null; 281 assert job != null; 282 283 Job periodicJob = new Job(schedule, name, job); 284 285 try 286 { 287 jobLock.lock(); 288 289 jobs.add(periodicJob); 290 } finally 291 { 292 jobLock.unlock(); 293 } 294 295 if (logger.isDebugEnabled()) 296 { 297 logger.debug("Added " + periodicJob); 298 } 299 300 // Wake the thread so that it can start the job, if necessary. 301 302 // Technically, this is only necessary if the new job is scheduled earlier 303 // than any job currently in the list of jobs, but this naive implementation 304 // is simpler. 305 thread.interrupt(); 306 307 return periodicJob; 308 } 309 310 @Override 311 public void run() 312 { 313 while (!shutdown) 314 { 315 long nextExecution = executeCurrentBatch(); 316 317 try 318 { 319 long delay = nextExecution - System.currentTimeMillis(); 320 321 if (logger.isTraceEnabled()) 322 { 323 logger.trace(String.format("Sleeping for %,d ms", delay)); 324 } 325 326 if (delay > 0) 327 { 328 Thread.sleep(delay); 329 } 330 } catch (InterruptedException 331 ex) 332 { 333 // Ignored; the thread is interrupted() to shut it down, 334 // or to have it execute a new batch. 335 336 logger.trace("Interrupted"); 337 } 338 } 339 } 340 341 private void registryDidShutdown() 342 { 343 shutdown = true; 344 345 thread.interrupt(); 346 } 347 348 /** 349 * Finds jobs and executes jobs that are ready to be executed. 350 * 351 * @return the next execution time (from the non-executing job that is scheduled earliest for execution). 352 */ 353 private long executeCurrentBatch() 354 { 355 long now = System.currentTimeMillis(); 356 long nextExecution = now + FIVE_MINUTES; 357 358 try 359 { 360 jobLock.lock(); 361 362 for (Job job : jobs) 363 { 364 if (job.isExecuting()) 365 { 366 continue; 367 } 368 369 long jobNextExecution = job.getNextExecution(); 370 371 if (jobNextExecution <= now) 372 { 373 job.start(); 374 } else 375 { 376 nextExecution = Math.min(nextExecution, jobNextExecution); 377 } 378 } 379 } finally 380 { 381 jobLock.unlock(); 382 } 383 384 return nextExecution; 385 } 386 387 388}