coherence/util/AbstractConcurrentQueue.hpp

00001 /*
00002 * AbstractConcurrentQueue.hpp
00003 *
00004 * Copyright (c) 2000, 2009, Oracle and/or its affiliates. All rights reserved.
00005 *
00006 * Oracle is a registered trademarks of Oracle Corporation and/or its
00007 * affiliates.
00008 *
00009 * This software is the confidential and proprietary information of Oracle
00010 * Corporation. You shall not disclose such confidential and proprietary
00011 * information and shall use it only in accordance with the terms of the
00012 * license agreement you entered into with Oracle.
00013 *
00014 * This notice may not be removed or altered.
00015 */
00016 #ifndef COH_ABSTRACT_CONCURRENT_QUEUE
00017 #define COH_ABSTRACT_CONCURRENT_QUEUE
00018 
00019 #include "coherence/lang.ns"
00020 
00021 #include "coherence/util/AtomicCounter.hpp"
00022 #include "coherence/util/Queue.hpp"
00023 
00024 COH_OPEN_NAMESPACE2(coherence,util)
00025 
00026 
00027 /**
00028 * The ConcurrentQueue provides a means to efficiently (and in a thread-safe
00029 * manner) queue elements with minimal contention.
00030 *
00031 * Note: The ConcurrentQueue does not support null entries.
00032 *
00033 * @author nsa 2008.01.19
00034 */
00035 class COH_EXPORT AbstractConcurrentQueue
00036     : public abstract_spec<AbstractConcurrentQueue,
00037         extends<Object>,
00038         implements<Queue> >
00039     {
00040     // ----- enums ----------------------------------------------------------
00041 
00042     public:
00043         /**
00044         * The FlushState values are used to indicate the state of the Queue
00045         * with regards to flushing:
00046         *
00047         * <ul>
00048         *   <li>
00049         *     FLUSH_PENDING:  Indicates that a flush is pending. At
00050         *                     some point the queue will flush itself
00051         *                     automatically or the consumer will explicitly
00052         *                     flush the queue.
00053         *   </li>
00054         *   <li>
00055         *     FLUSH_AUTO:     Indicates that no flush is pending as the queue
00056         *                     has been auto flushed. This state will not be
00057         *                     reset to FLUSH_PENDING until the queue has been
00058         *                     cleared and the producer has added a new
00059         *                     element.
00060         *   </li>
00061         *   <li>
00062         *     FLUSH_EXPLICIT: Indicates that no flush is pending as the queue
00063         *                     has been explicitly flushed. This state will
00064         *                     not be reset to FLUSH_PENDING until the queue
00065         *                     has been cleared and the producer has added a
00066         *                     new element, or if the producer calls flush
00067         *                     multiple times before the queue has
00068         *                     been cleared.
00069         *   </li>
00070         * </ul>
00071         */
00072         typedef enum
00073             {
00074             FLUSH_PENDING = 0,
00075             FLUSH_AUTO = 1,
00076             FLUSH_EXPLICIT = 2
00077             } FlushState;
00078 
00079 
00080     // ----- constructors ---------------------------------------------------
00081 
00082     protected:
00083         /**
00084         * @internal
00085         */
00086         AbstractConcurrentQueue();
00087 
00088 
00089     // ----- AbstractConcurrentQueue interface ------------------------------
00090 
00091     public:
00092         /**
00093         * Return the current flush state.
00094         */
00095         virtual FlushState getFlushState() const;
00096 
00097         /**
00098         * Return whether a flush is pending or not.
00099         *
00100         * @return true if a flush is pending
00101         */
00102         virtual bool isFlushPending() const;
00103 
00104         /**
00105         * Set the batch size of the queue.
00106         *
00107         * @param cBatch  the batch size to set
00108         */
00109         virtual void setBatchSize(int32_t cBatch);
00110 
00111         /**
00112         * Return the batch size of the queue.
00113         *
00114         * @return the batch size of the queue
00115         */
00116         virtual int32_t getBatchSize() const;
00117 
00118         /**
00119         * Return the ElementCounter for this queue
00120         *
00121         * @return the element counter for this queue
00122         */
00123         virtual AtomicCounter::Handle getElementCounter();
00124 
00125         /**
00126         * Return the object used for notifications on this queue.
00127         *
00128         * @return the object used for notifications on this queue.
00129         */
00130         virtual Object::Handle getNotifier();
00131 
00132         /**
00133         * Set the object used for notifications on this queue.
00134         *
00135         * @param hNotifier  the object used for notifications on this queue
00136         */
00137         virtual void setNotifier(Object::Handle hNotifier);
00138 
00139         /**
00140         * Wait for the queue to contain at least one entry.
00141         *
00142         * Note: By the time the method returns the entry may have already
00143         * been removed by another thread.
00144         *
00145         * @param cMillis the number of milliseconds to wait before returing
00146         *                without having been notified, or 0 to wait until
00147         *                notified
00148         */
00149         virtual void waitForEntry(int64_t cMillis);
00150 
00151         /**
00152         * Return the total number of times the queue has been emptied.
00153         *
00154         * @return the total number of times the queue has been emptied.
00155         */
00156         virtual int64_t getStatsEmptied() const;
00157 
00158         /**
00159         * Return the total number of times the queue has been flushed.
00160         *
00161         * @return the total number of times the queue has been flushed
00162         */
00163         virtual int64_t getStatsFlushed() const;
00164 
00165 
00166     // ----- internal helpers -----------------------------------------------
00167 
00168     protected:
00169         /**
00170         * Check whether or not the flush (notify) is necessary. This method
00171         * is always called when a new item is added to the queue.
00172         *
00173         * @param cElements  the number of elements in the queue after the
00174         *                   addition
00175         */
00176         virtual void checkFlush(int32_t cElements);
00177 
00178         /**
00179         * Flush the queue.
00180         *
00181         * @param fAuto  iff the flush was invoked automatically based on the
00182         *               notification batch size
00183         *
00184         */
00185         virtual void flush(bool fAuto);
00186 
00187         /**
00188         * Event called each time an element is added to the queue.
00189         */
00190         virtual void onAddElement();
00191 
00192         /**
00193         * Event called when the queue becomes empty.
00194         */
00195         virtual void onEmpty();
00196 
00197         /**
00198         * Set the flush state and return the previous state.
00199         *
00200         * @param nState  the state to set
00201         *
00202         * @return the previous flush state
00203         */
00204         virtual FlushState updateFlushState(FlushState nState);
00205 
00206         /**
00207         * Set the flush state iff the assumed state is correct, return the
00208         * previous flushState
00209         *
00210         * @param nStateAssumed  the assumed current value of the flush state
00211         * @param nStateNew      the new flush state value
00212         *
00213         * @return FlushState  the old flush state value
00214         */
00215         virtual FlushState updateFlushStateConditionally(FlushState nStateAssumed,
00216                 FlushState nStateNew);
00217 
00218         /**
00219         * Return the FlushState counter for this queue
00220         *
00221         * @return the FlushState counter for this queue
00222         */
00223         virtual AtomicCounter::Handle getAtomicFlushState();
00224 
00225         /**
00226         * Set the total number of times the queue has been emptied.
00227         *
00228         * @param cEmpties  the total number of times the queue has been
00229         *                  flushed.
00230         */
00231         virtual void setStatsEmptied(int64_t cEmpties);
00232 
00233         /**
00234         * Set the total number of times the queue has been flushed.
00235         *
00236         * @param cFlushed  the total number of times the queue has been
00237         *                  flushed
00238         */
00239         virtual void setStatsFlushed(int64_t cFlushed);
00240 
00241 
00242     // ----- Queue interface ------------------------------------------------
00243 
00244     public:
00245         /**
00246         * @{inheritDoc}
00247         */
00248         virtual void flush();
00249 
00250         /**
00251         * @{inheritDoc}
00252         */
00253         virtual size32_t size() const;
00254 
00255         /**
00256         *  @{inheritDoc}
00257         */
00258         virtual Object::Holder remove();
00259 
00260 
00261     // ----- data members ---------------------------------------------------
00262 
00263     protected:
00264         /**
00265         * The AtomicLong used to maintain the FlushState.  See
00266         * getFlushState() and setFlushState() helper methods.
00267         */
00268         FinalHandle<AtomicCounter> m_hAtomicFlushState;
00269 
00270         /**
00271         * The queue size at which to auto-flush the queue during an add
00272         * operation.  If the BatchSize is greater then one, the caller must
00273         * externally call flush() when it has finished adding elements in
00274         * order to ensure that they may be processed by any waiting consumer
00275         * thread.
00276         */
00277         int32_t m_iBatchSize;
00278 
00279         /**
00280         * A counter for maintaining the size of the queue.
00281         */
00282         FinalHandle<AtomicCounter> m_hElementCounter;
00283 
00284         /**
00285         * The monitor on which notifications related to a queue addition
00286         * will be performed.  The default value is the Queue itself.  The
00287         * Notifier should not be changed while the queue is in use.  If the
00288         * Notifier is null then notification will be disabled.
00289         */
00290         MemberHandle<Object> m_hNotifier;
00291 
00292         /**
00293         * The total number of times the queue transitioned to the empty
00294         * state.
00295         */
00296         int64_t m_lStatsEmptied;
00297 
00298         /**
00299         * The total number of times the queue has been flushed.
00300         */
00301         int64_t m_lStatusFlushed;
00302 
00303         /**
00304         * Indicate whether the notifier references itself
00305         */
00306         bool m_fSelfNotifier;
00307     };
00308 
00309 COH_CLOSE_NAMESPACE2
00310 
00311 #endif // COH_ABSTRACT_CONCURRENT_QUEUE
Copyright (c) 2000, 2009, Oracle and/or its affiliates. All rights reserved.