RSSLoader.java (13324B)
1 /* 2 * Copyright (C) 2011 A. Horn 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 package org.mcsoxford.rss; 17 18 import java.util.concurrent.BlockingQueue; 19 import java.util.concurrent.ExecutionException; 20 import java.util.concurrent.Future; 21 import java.util.concurrent.LinkedBlockingQueue; 22 import java.util.concurrent.PriorityBlockingQueue; 23 import java.util.concurrent.TimeUnit; 24 import java.util.concurrent.TimeoutException; 25 import java.util.concurrent.atomic.AtomicInteger; 26 27 /** 28 * Asynchronous loader for RSS feeds. RSS feeds can be loaded in FIFO order or 29 * based on priority. Objects of this type can be constructed with one of the 30 * provided static methods: 31 * <ul> 32 * <li>{@link #fifo()}</li> 33 * <li>{@link #fifo(int)}</li> 34 * <li>{@link #priority()}</li> 35 * <li>{@link #priority(int)}</li> 36 * </ul> 37 * 38 * Completed RSS feed loads can be retrieved with {@link RSSLoader#take()}, 39 * {@link RSSLoader#poll()} or {@link RSSLoader#poll(long, TimeUnit)}. 40 * 41 * <p> 42 * <b>Usage Example</b> 43 * 44 * Suppose you want to load an array of RSS feed URIs concurrently before 45 * retrieving the results one at a time. You could write this as: 46 * 47 * <pre> 48 * {@code 49 * void fetchRSS(String[] uris) throws InterruptedException { 50 * RSSLoader loader = RSSLoader.fifo(); 51 * for (String uri : uris) { 52 * loader.load(uri); 53 * } 54 * 55 * Future<RSSFeed> future; 56 * RSSFeed feed; 57 * for (int i = 0; i < uris.length; i++) { 58 * future = loader.take(); 59 * try { 60 * feed = future.get(); 61 * use(feed); 62 * } catch (ExecutionException ignore) {} 63 * } 64 * }} 65 * </pre> 66 * 67 * </p> 68 * 69 * @author A. Horn 70 */ 71 public class RSSLoader { 72 73 /** 74 * Human-readable name of the thread loading RSS feeds 75 */ 76 private final static String DEFAULT_THREAD_NAME = "Asynchronous RSS feed loader"; 77 78 /** 79 * Arrange incoming load requests on this queue. 80 */ 81 private final BlockingQueue<RSSFuture> in; 82 83 /** 84 * Once the an RSS feed has completed loading, place the result on this queue. 85 */ 86 private final BlockingQueue<RSSFuture> out; 87 88 /** 89 * Flag changes are visible after operations on {@link #in} queue. 90 */ 91 private boolean stopped; 92 93 /** 94 * Create an object which can load RSS feeds asynchronously in FIFO order. 95 * 96 * @see #fifo(int) 97 */ 98 public static RSSLoader fifo() { 99 return new RSSLoader(new LinkedBlockingQueue<RSSFuture>()); 100 } 101 102 /** 103 * Create an object which can load RSS feeds asynchronously in FIFO order. 104 * 105 * @param capacity 106 * expected number of URIs to be loaded at a given time 107 */ 108 public static RSSLoader fifo(int capacity) { 109 return new RSSLoader(new LinkedBlockingQueue<RSSFuture>(capacity)); 110 } 111 112 /** 113 * Create an object which can load RSS feeds asynchronously based on priority. 114 * 115 * @see #priority(int) 116 */ 117 public static RSSLoader priority() { 118 return new RSSLoader(new PriorityBlockingQueue<RSSFuture>()); 119 } 120 121 /** 122 * Create an object which can load RSS feeds asynchronously based on priority. 123 * 124 * @param capacity 125 * expected number of URIs to be loaded at a given time 126 */ 127 public static RSSLoader priority(int capacity) { 128 return new RSSLoader(new PriorityBlockingQueue<RSSFuture>(capacity)); 129 } 130 131 /** 132 * Instantiate an object which can load RSS feeds asynchronously. The provided 133 * {@link BlockingQueue} implementation determines the load behaviour. 134 * 135 * @see LinkedBlockingQueue 136 * @see PriorityBlockingQueue 137 */ 138 RSSLoader(BlockingQueue<RSSFuture> in) { 139 this.in = in; 140 this.out = new LinkedBlockingQueue<RSSFuture>(); 141 142 // start separate thread for loading of RSS feeds 143 new Thread(new Loader(new RSSReader()), DEFAULT_THREAD_NAME).start(); 144 } 145 146 /** 147 * Returns {@code true} if RSS feeds are currently being loaded, {@code false} 148 * otherwise. 149 */ 150 public boolean isLoading() { 151 // order of conjuncts matters because of happens-before relationship 152 return !in.isEmpty() && !stopped; 153 } 154 155 /** 156 * Stop thread after finishing loading pending RSS feed URIs. If this loader 157 * has been constructed with {@link #priority()} or {@link #priority(int)}, 158 * only RSS feed loads with priority strictly greater than seven (7) are going 159 * to be completed. 160 * <p> 161 * Subsequent invocations of {@link #load(String)} and 162 * {@link #load(String, int)} return {@code null}. 163 */ 164 public void stop() { 165 // flag writings happen-before enqueue 166 stopped = true; 167 in.offer(SENTINEL); 168 } 169 170 /** 171 * Loads the specified RSS feed URI asynchronously. If this loader has been 172 * constructed with {@link #priority()} or {@link #priority(int)}, then a 173 * default priority of three (3) is used. Otherwise, RSS feeds are loaded in 174 * FIFO order. 175 * <p> 176 * Returns {@code null} if the RSS feed URI cannot be scheduled for loading 177 * due to resource constraints or if {@link #stop()} has been previously 178 * called. 179 * <p> 180 * Completed RSS feed loads can be retrieved by calling {@link #take()}. 181 * Alternatively, non-blocking polling is possible with {@link #poll()}. 182 * 183 * @param uri 184 * RSS feed URI to be loaded 185 * 186 * @return Future representing the RSS feed scheduled for loading, 187 * {@code null} if scheduling failed 188 */ 189 public Future<RSSFeed> load(String uri) { 190 return load(uri, RSSFuture.DEFAULT_PRIORITY); 191 } 192 193 /** 194 * Loads the specified RSS feed URI asynchronously. For the specified priority 195 * to determine the relative loading order of RSS feeds, this loader must have 196 * been constructed with {@link #priority()} or {@link #priority(int)}. 197 * Otherwise, RSS feeds are loaded in FIFO order. 198 * <p> 199 * Returns {@code null} if the RSS feed URI cannot be scheduled for loading 200 * due to resource constraints or if {@link #stop()} has been previously 201 * called. 202 * <p> 203 * Completed RSS feed loads can be retrieved by calling {@link #take()}. 204 * Alternatively, non-blocking polling is possible with {@link #poll()}. 205 * 206 * @param uri 207 * RSS feed URI to be loaded 208 * @param priority 209 * larger integer gives higher priority 210 * 211 * @return Future representing the RSS feed scheduled for loading, 212 * {@code null} if scheduling failed 213 */ 214 public Future<RSSFeed> load(String uri, int priority) { 215 if (uri == null) { 216 throw new IllegalArgumentException("RSS feed URI must not be null."); 217 } 218 219 // optimization (after flag changes have become visible) 220 if (stopped) { 221 return null; 222 } 223 224 // flag readings happen-after enqueue 225 final RSSFuture future = new RSSFuture(uri, priority); 226 final boolean ok = in.offer(future); 227 228 if (!ok || stopped) { 229 return null; 230 } 231 232 return future; 233 } 234 235 /** 236 * Retrieves and removes the next Future representing the result of loading an 237 * RSS feed, waiting if none are yet present. 238 * 239 * @return the {@link Future} representing the loaded RSS feed 240 * 241 * @throws InterruptedException 242 * if interrupted while waiting 243 */ 244 public Future<RSSFeed> take() throws InterruptedException { 245 return out.take(); 246 } 247 248 /** 249 * Retrieves and removes the next Future representing the result of loading an 250 * RSS feed or {@code null} if none are present. 251 * 252 * @return the {@link Future} representing the loaded RSS feed, or 253 * {@code null} if none are present 254 * 255 * @throws InterruptedException 256 * if interrupted while waiting 257 */ 258 public Future<RSSFeed> poll() { 259 return out.poll(); 260 } 261 262 /** 263 * Retrieves and removes the Future representing the result of loading an RSS 264 * feed, waiting if necessary up to the specified wait time if none are yet 265 * present. 266 * 267 * @param timeout 268 * how long to wait before giving up, in units of {@code unit} 269 * @param unit 270 * a {@link TimeUnit} determining how to interpret the 271 * {@code timeout} parameter 272 * @return the {@link Future} representing the loaded RSS feed, or 273 * {@code null} if none are present within the specified time interval 274 * @throws InterruptedException 275 * if interrupted while waiting 276 */ 277 public Future<RSSFeed> poll(long timeout, TimeUnit unit) throws InterruptedException { 278 return out.poll(timeout, unit); 279 } 280 281 /** 282 * Internal consumer of RSS feed URIs stored in the blocking queue. 283 */ 284 class Loader implements Runnable { 285 286 private final RSSReader reader; 287 288 Loader(RSSReader reader) { 289 this.reader = reader; 290 } 291 292 /** 293 * Keep on loading RSS feeds by dequeuing incoming tasks until the sentinel 294 * is encountered. 295 */ 296 @Override 297 public void run() { 298 try { 299 RSSFuture future = null; 300 RSSFeed feed; 301 while ((future = in.take()) != SENTINEL) { 302 303 if (future.status.compareAndSet(RSSFuture.READY, RSSFuture.LOADING)) { 304 try { 305 // perform loading outside of locked region 306 feed = reader.load(future.uri); 307 308 // set successfully loaded RSS feed 309 future.set(feed, /* error */null); 310 311 // enable caller to consume the loaded RSS feed 312 out.add(future); 313 } catch (RSSException e) { 314 // throw ExecutionException when calling RSSFuture::get() 315 future.set(/* feed */null, e); 316 } catch (RSSFault e) { 317 // throw ExecutionException when calling RSSFuture::get() 318 future.set(/* feed */null, e); 319 } finally { 320 // RSSFuture::isDone() returns true even if an error occurred 321 future.status.compareAndSet(RSSFuture.LOADING, RSSFuture.LOADED); 322 } 323 } 324 325 } 326 } catch (InterruptedException e) { 327 // Restore the interrupted status 328 Thread.currentThread().interrupt(); 329 } 330 } 331 332 } 333 334 /** 335 * Internal sentinel to stop the thread that is loading RSS feeds. 336 */ 337 private final static RSSFuture SENTINEL = new RSSFuture(null, /* priority */7); 338 339 /** 340 * Offer callers control over the asynchronous loading of an RSS feed. 341 */ 342 static class RSSFuture implements Future<RSSFeed>, Comparable<RSSFuture> { 343 344 static final int DEFAULT_PRIORITY = 3; 345 static final int READY = 0; 346 static final int LOADING = 1; 347 static final int LOADED = 2; 348 static final int CANCELLED = 4; 349 350 /** RSS feed URI */ 351 final String uri; 352 353 /** Larger integer gives higher priority */ 354 final int priority; 355 356 AtomicInteger status; 357 358 boolean waiting; 359 RSSFeed feed; 360 Exception cause; 361 362 RSSFuture(String uri, int priority) { 363 this.uri = uri; 364 this.priority = priority; 365 status = new AtomicInteger(READY); 366 } 367 368 @Override 369 public boolean cancel(boolean mayInterruptIfRunning) { 370 return isCancelled() || status.compareAndSet(READY, CANCELLED); 371 } 372 373 @Override 374 public boolean isCancelled() { 375 return status.get() == CANCELLED; 376 } 377 378 @Override 379 public boolean isDone() { 380 return (status.get() & (LOADED | CANCELLED)) != 0; 381 } 382 383 @Override 384 public synchronized RSSFeed get() throws InterruptedException, ExecutionException { 385 if (feed == null && cause == null) { 386 try { 387 waiting = true; 388 389 // guard against spurious wakeups 390 while (waiting) { 391 wait(); 392 } 393 } finally { 394 waiting = false; 395 } 396 } 397 398 if (cause != null) { 399 throw new ExecutionException(cause); 400 } 401 402 return feed; 403 } 404 405 @Override 406 public synchronized RSSFeed get(long timeout, TimeUnit unit) 407 throws InterruptedException, ExecutionException, TimeoutException { 408 409 if (feed == null && cause == null) { 410 try { 411 waiting = true; 412 413 final long timeoutMillis = unit.toMillis(timeout); 414 final long startMillis = System.currentTimeMillis(); 415 416 // guard against spurious wakeups 417 while (waiting) { 418 wait(timeoutMillis); 419 420 // check timeout 421 if (System.currentTimeMillis() - startMillis > timeoutMillis) { 422 throw new TimeoutException("RSS feed loading timed out"); 423 } 424 } 425 } finally { 426 waiting = false; 427 } 428 } 429 430 if (cause != null) { 431 throw new ExecutionException(cause); 432 } 433 434 return feed; 435 } 436 437 synchronized void set(RSSFeed feed, Exception cause) { 438 this.feed = feed; 439 this.cause = cause; 440 441 if (waiting) { 442 waiting = false; 443 notifyAll(); 444 } 445 } 446 447 @Override 448 public int compareTo(RSSFuture other) { 449 // Note: head of PriorityQueue implementation is the least element 450 return other.priority - priority; 451 } 452 } 453 454 } 455