android-ibc-forum

Unnamed repository; edit this file 'description' to name the repository.
git clone http://git.code.weiherhei.de/android-ibc-forum.git
Log | Files | Refs

RSSLoader.java (13324B)


      1 /*
      2  * Copyright (C) 2011 A. Horn
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *      http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 package org.mcsoxford.rss;
     17 
     18 import java.util.concurrent.BlockingQueue;
     19 import java.util.concurrent.ExecutionException;
     20 import java.util.concurrent.Future;
     21 import java.util.concurrent.LinkedBlockingQueue;
     22 import java.util.concurrent.PriorityBlockingQueue;
     23 import java.util.concurrent.TimeUnit;
     24 import java.util.concurrent.TimeoutException;
     25 import java.util.concurrent.atomic.AtomicInteger;
     26 
     27 /**
     28  * Asynchronous loader for RSS feeds. RSS feeds can be loaded in FIFO order or
     29  * based on priority. Objects of this type can be constructed with one of the
     30  * provided static methods:
     31  * <ul>
     32  * <li>{@link #fifo()}</li>
     33  * <li>{@link #fifo(int)}</li>
     34  * <li>{@link #priority()}</li>
     35  * <li>{@link #priority(int)}</li>
     36  * </ul>
     37  * 
     38  * Completed RSS feed loads can be retrieved with {@link RSSLoader#take()},
     39  * {@link RSSLoader#poll()} or {@link RSSLoader#poll(long, TimeUnit)}.
     40  * 
     41  * <p>
     42  * <b>Usage Example</b>
     43  * 
     44  * Suppose you want to load an array of RSS feed URIs concurrently before
     45  * retrieving the results one at a time. You could write this as:
     46  * 
     47  * <pre>
     48  * {@code 
     49  *  void fetchRSS(String[] uris) throws InterruptedException {
     50  *     RSSLoader loader = RSSLoader.fifo();
     51  *     for (String uri : uris) {
     52  *       loader.load(uri);
     53  *     }
     54  *     
     55  *     Future&lt;RSSFeed&gt; future;
     56  *     RSSFeed feed;
     57  *     for (int i = 0; i &lt; uris.length; i++) {
     58  *       future = loader.take();
     59  *       try {
     60  *         feed = future.get();
     61  *         use(feed);
     62  *       } catch (ExecutionException ignore) {}
     63  *     }
     64  * }}
     65  * </pre>
     66  *
     67  * </p>
     68  * 
     69  * @author A. Horn
     70  */
     71 public class RSSLoader {
     72 
     73   /**
     74    * Human-readable name of the thread loading RSS feeds
     75    */
     76   private final static String DEFAULT_THREAD_NAME = "Asynchronous RSS feed loader";
     77 
     78   /**
     79    * Arrange incoming load requests on this queue.
     80    */
     81   private final BlockingQueue<RSSFuture> in;
     82 
     83   /**
     84    * Once the an RSS feed has completed loading, place the result on this queue.
     85    */
     86   private final BlockingQueue<RSSFuture> out;
     87 
     88   /**
     89    * Flag changes are visible after operations on {@link #in} queue.
     90    */
     91   private boolean stopped;
     92 
     93   /**
     94    * Create an object which can load RSS feeds asynchronously in FIFO order.
     95    * 
     96    * @see #fifo(int)
     97    */
     98   public static RSSLoader fifo() {
     99     return new RSSLoader(new LinkedBlockingQueue<RSSFuture>());
    100   }
    101 
    102   /**
    103    * Create an object which can load RSS feeds asynchronously in FIFO order.
    104    * 
    105    * @param capacity
    106    *          expected number of URIs to be loaded at a given time
    107    */
    108   public static RSSLoader fifo(int capacity) {
    109     return new RSSLoader(new LinkedBlockingQueue<RSSFuture>(capacity));
    110   }
    111 
    112   /**
    113    * Create an object which can load RSS feeds asynchronously based on priority.
    114    * 
    115    * @see #priority(int)
    116    */
    117   public static RSSLoader priority() {
    118     return new RSSLoader(new PriorityBlockingQueue<RSSFuture>());
    119   }
    120 
    121   /**
    122    * Create an object which can load RSS feeds asynchronously based on priority.
    123    * 
    124    * @param capacity
    125    *          expected number of URIs to be loaded at a given time
    126    */
    127   public static RSSLoader priority(int capacity) {
    128     return new RSSLoader(new PriorityBlockingQueue<RSSFuture>(capacity));
    129   }
    130 
    131   /**
    132    * Instantiate an object which can load RSS feeds asynchronously. The provided
    133    * {@link BlockingQueue} implementation determines the load behaviour.
    134    * 
    135    * @see LinkedBlockingQueue
    136    * @see PriorityBlockingQueue
    137    */
    138   RSSLoader(BlockingQueue<RSSFuture> in) {
    139     this.in = in;
    140     this.out = new LinkedBlockingQueue<RSSFuture>();
    141 
    142     // start separate thread for loading of RSS feeds
    143     new Thread(new Loader(new RSSReader()), DEFAULT_THREAD_NAME).start();
    144   }
    145 
    146   /**
    147    * Returns {@code true} if RSS feeds are currently being loaded, {@code false}
    148    * otherwise.
    149    */
    150   public boolean isLoading() {
    151     // order of conjuncts matters because of happens-before relationship
    152     return !in.isEmpty() && !stopped;
    153   }
    154 
    155   /**
    156    * Stop thread after finishing loading pending RSS feed URIs. If this loader
    157    * has been constructed with {@link #priority()} or {@link #priority(int)},
    158    * only RSS feed loads with priority strictly greater than seven (7) are going
    159    * to be completed.
    160    * <p>
    161    * Subsequent invocations of {@link #load(String)} and
    162    * {@link #load(String, int)} return {@code null}.
    163    */
    164   public void stop() {
    165     // flag writings happen-before enqueue
    166     stopped = true;
    167     in.offer(SENTINEL);
    168   }
    169 
    170   /**
    171    * Loads the specified RSS feed URI asynchronously. If this loader has been
    172    * constructed with {@link #priority()} or {@link #priority(int)}, then a
    173    * default priority of three (3) is used. Otherwise, RSS feeds are loaded in
    174    * FIFO order.
    175    * <p>
    176    * Returns {@code null} if the RSS feed URI cannot be scheduled for loading
    177    * due to resource constraints or if {@link #stop()} has been previously
    178    * called.
    179    * <p>
    180    * Completed RSS feed loads can be retrieved by calling {@link #take()}.
    181    * Alternatively, non-blocking polling is possible with {@link #poll()}.
    182    * 
    183    * @param uri
    184    *          RSS feed URI to be loaded
    185    * 
    186    * @return Future representing the RSS feed scheduled for loading,
    187    *         {@code null} if scheduling failed
    188    */
    189   public Future<RSSFeed> load(String uri) {
    190     return load(uri, RSSFuture.DEFAULT_PRIORITY);
    191   }
    192 
    193   /**
    194    * Loads the specified RSS feed URI asynchronously. For the specified priority
    195    * to determine the relative loading order of RSS feeds, this loader must have
    196    * been constructed with {@link #priority()} or {@link #priority(int)}.
    197    * Otherwise, RSS feeds are loaded in FIFO order.
    198    * <p>
    199    * Returns {@code null} if the RSS feed URI cannot be scheduled for loading
    200    * due to resource constraints or if {@link #stop()} has been previously
    201    * called.
    202    * <p>
    203    * Completed RSS feed loads can be retrieved by calling {@link #take()}.
    204    * Alternatively, non-blocking polling is possible with {@link #poll()}.
    205    * 
    206    * @param uri
    207    *          RSS feed URI to be loaded
    208    * @param priority
    209    *          larger integer gives higher priority
    210    * 
    211    * @return Future representing the RSS feed scheduled for loading,
    212    *         {@code null} if scheduling failed
    213    */
    214   public Future<RSSFeed> load(String uri, int priority) {
    215     if (uri == null) {
    216       throw new IllegalArgumentException("RSS feed URI must not be null.");
    217     }
    218 
    219     // optimization (after flag changes have become visible)
    220     if (stopped) {
    221       return null;
    222     }
    223 
    224     // flag readings happen-after enqueue
    225     final RSSFuture future = new RSSFuture(uri, priority);
    226     final boolean ok = in.offer(future);
    227 
    228     if (!ok || stopped) {
    229       return null;
    230     }
    231 
    232     return future;
    233   }
    234 
    235   /**
    236    * Retrieves and removes the next Future representing the result of loading an
    237    * RSS feed, waiting if none are yet present.
    238    * 
    239    * @return the {@link Future} representing the loaded RSS feed
    240    * 
    241    * @throws InterruptedException
    242    *           if interrupted while waiting
    243    */
    244   public Future<RSSFeed> take() throws InterruptedException {
    245     return out.take();
    246   }
    247 
    248   /**
    249    * Retrieves and removes the next Future representing the result of loading an
    250    * RSS feed or {@code null} if none are present.
    251    * 
    252    * @return the {@link Future} representing the loaded RSS feed, or
    253    *         {@code null} if none are present
    254    * 
    255    * @throws InterruptedException
    256    *           if interrupted while waiting
    257    */
    258   public Future<RSSFeed> poll() {
    259     return out.poll();
    260   }
    261 
    262   /**
    263    * Retrieves and removes the Future representing the result of loading an RSS
    264    * feed, waiting if necessary up to the specified wait time if none are yet
    265    * present.
    266    * 
    267    * @param timeout
    268    *          how long to wait before giving up, in units of {@code unit}
    269    * @param unit
    270    *          a {@link TimeUnit} determining how to interpret the
    271    *          {@code timeout} parameter
    272    * @return the {@link Future} representing the loaded RSS feed, or
    273    *         {@code null} if none are present within the specified time interval
    274    * @throws InterruptedException
    275    *           if interrupted while waiting
    276    */
    277   public Future<RSSFeed> poll(long timeout, TimeUnit unit) throws InterruptedException {
    278     return out.poll(timeout, unit);
    279   }
    280 
    281   /**
    282    * Internal consumer of RSS feed URIs stored in the blocking queue.
    283    */
    284   class Loader implements Runnable {
    285 
    286     private final RSSReader reader;
    287 
    288     Loader(RSSReader reader) {
    289       this.reader = reader;
    290     }
    291 
    292     /**
    293      * Keep on loading RSS feeds by dequeuing incoming tasks until the sentinel
    294      * is encountered.
    295      */
    296     @Override
    297     public void run() {
    298       try {
    299         RSSFuture future = null;
    300         RSSFeed feed;
    301         while ((future = in.take()) != SENTINEL) {
    302 
    303           if (future.status.compareAndSet(RSSFuture.READY, RSSFuture.LOADING)) {
    304             try {
    305               // perform loading outside of locked region
    306               feed = reader.load(future.uri);
    307 
    308               // set successfully loaded RSS feed
    309               future.set(feed, /* error */null);
    310 
    311               // enable caller to consume the loaded RSS feed
    312               out.add(future);
    313             } catch (RSSException e) {
    314               // throw ExecutionException when calling RSSFuture::get()
    315               future.set(/* feed */null, e);
    316             } catch (RSSFault e) {
    317               // throw ExecutionException when calling RSSFuture::get()
    318               future.set(/* feed */null, e);
    319             } finally {
    320               // RSSFuture::isDone() returns true even if an error occurred
    321               future.status.compareAndSet(RSSFuture.LOADING, RSSFuture.LOADED);
    322             }
    323           }
    324 
    325         }
    326       } catch (InterruptedException e) {
    327         // Restore the interrupted status
    328         Thread.currentThread().interrupt();
    329       }
    330     }
    331 
    332   }
    333 
    334   /**
    335    * Internal sentinel to stop the thread that is loading RSS feeds.
    336    */
    337   private final static RSSFuture SENTINEL = new RSSFuture(null, /* priority */7);
    338 
    339   /**
    340    * Offer callers control over the asynchronous loading of an RSS feed.
    341    */
    342   static class RSSFuture implements Future<RSSFeed>, Comparable<RSSFuture> {
    343 
    344     static final int DEFAULT_PRIORITY = 3;
    345     static final int READY = 0;
    346     static final int LOADING = 1;
    347     static final int LOADED = 2;
    348     static final int CANCELLED = 4;
    349 
    350     /** RSS feed URI */
    351     final String uri;
    352 
    353     /** Larger integer gives higher priority */
    354     final int priority;
    355 
    356     AtomicInteger status;
    357 
    358     boolean waiting;
    359     RSSFeed feed;
    360     Exception cause;
    361 
    362     RSSFuture(String uri, int priority) {
    363       this.uri = uri;
    364       this.priority = priority;
    365       status = new AtomicInteger(READY);
    366     }
    367 
    368     @Override
    369     public boolean cancel(boolean mayInterruptIfRunning) {
    370       return isCancelled() || status.compareAndSet(READY, CANCELLED);
    371     }
    372 
    373     @Override
    374     public boolean isCancelled() {
    375       return status.get() == CANCELLED;
    376     }
    377 
    378     @Override
    379     public boolean isDone() {
    380       return (status.get() & (LOADED | CANCELLED)) != 0;
    381     }
    382 
    383     @Override
    384     public synchronized RSSFeed get() throws InterruptedException, ExecutionException {
    385       if (feed == null && cause == null) {
    386         try {
    387           waiting = true;
    388 
    389           // guard against spurious wakeups
    390           while (waiting) {
    391             wait();
    392           }
    393         } finally {
    394           waiting = false;
    395         }
    396       }
    397 
    398       if (cause != null) {
    399         throw new ExecutionException(cause);
    400       }
    401 
    402       return feed;
    403     }
    404 
    405     @Override
    406     public synchronized RSSFeed get(long timeout, TimeUnit unit)
    407         throws InterruptedException, ExecutionException, TimeoutException {
    408 
    409       if (feed == null && cause == null) {
    410         try {
    411           waiting = true;
    412 
    413           final long timeoutMillis = unit.toMillis(timeout);
    414           final long startMillis = System.currentTimeMillis();
    415 
    416           // guard against spurious wakeups
    417           while (waiting) {
    418             wait(timeoutMillis);
    419 
    420             // check timeout
    421             if (System.currentTimeMillis() - startMillis > timeoutMillis) {
    422               throw new TimeoutException("RSS feed loading timed out");
    423             }
    424           }
    425         } finally {
    426           waiting = false;
    427         }
    428       }
    429 
    430       if (cause != null) {
    431         throw new ExecutionException(cause);
    432       }
    433 
    434       return feed;
    435     }
    436 
    437     synchronized void set(RSSFeed feed, Exception cause) {
    438       this.feed = feed;
    439       this.cause = cause;
    440 
    441       if (waiting) {
    442         waiting = false;
    443         notifyAll();
    444       }
    445     }
    446 
    447     @Override
    448     public int compareTo(RSSFuture other) {
    449       // Note: head of PriorityQueue implementation is the least element
    450       return other.priority - priority;
    451     }
    452   }
    453 
    454 }
    455