00001 /* 00002 * AbstractConcurrentQueue.hpp 00003 * 00004 * Copyright (c) 2000, 2010, Oracle and/or its affiliates. All rights reserved. 00005 * 00006 * Oracle is a registered trademarks of Oracle Corporation and/or its 00007 * affiliates. 00008 * 00009 * This software is the confidential and proprietary information of Oracle 00010 * Corporation. You shall not disclose such confidential and proprietary 00011 * information and shall use it only in accordance with the terms of the 00012 * license agreement you entered into with Oracle. 00013 * 00014 * This notice may not be removed or altered. 00015 */ 00016 #ifndef COH_ABSTRACT_CONCURRENT_QUEUE 00017 #define COH_ABSTRACT_CONCURRENT_QUEUE 00018 00019 #include "coherence/lang.ns" 00020 00021 #include "coherence/util/AtomicCounter.hpp" 00022 #include "coherence/util/Queue.hpp" 00023 00024 COH_OPEN_NAMESPACE2(coherence,util) 00025 00026 00027 /** 00028 * The ConcurrentQueue provides a means to efficiently (and in a thread-safe 00029 * manner) queue elements with minimal contention. 00030 * 00031 * Note: The ConcurrentQueue does not support null entries. 00032 * 00033 * @author nsa 2008.01.19 00034 */ 00035 class COH_EXPORT AbstractConcurrentQueue 00036 : public abstract_spec<AbstractConcurrentQueue, 00037 extends<Object>, 00038 implements<Queue> > 00039 { 00040 // ----- enums ---------------------------------------------------------- 00041 00042 public: 00043 /** 00044 * The FlushState values are used to indicate the state of the Queue 00045 * with regards to flushing: 00046 * 00047 * <ul> 00048 * <li> 00049 * FLUSH_PENDING: Indicates that a flush is pending. At 00050 * some point the queue will flush itself 00051 * automatically or the consumer will explicitly 00052 * flush the queue. 00053 * </li> 00054 * <li> 00055 * FLUSH_AUTO: Indicates that no flush is pending as the queue 00056 * has been auto flushed. This state will not be 00057 * reset to FLUSH_PENDING until the queue has been 00058 * cleared and the producer has added a new 00059 * element. 00060 * </li> 00061 * <li> 00062 * FLUSH_EXPLICIT: Indicates that no flush is pending as the queue 00063 * has been explicitly flushed. This state will 00064 * not be reset to FLUSH_PENDING until the queue 00065 * has been cleared and the producer has added a 00066 * new element, or if the producer calls flush 00067 * multiple times before the queue has 00068 * been cleared. 00069 * </li> 00070 * </ul> 00071 */ 00072 typedef enum 00073 { 00074 FLUSH_PENDING = 0, 00075 FLUSH_AUTO = 1, 00076 FLUSH_EXPLICIT = 2 00077 } FlushState; 00078 00079 00080 // ----- constructors --------------------------------------------------- 00081 00082 protected: 00083 /** 00084 * @internal 00085 */ 00086 AbstractConcurrentQueue(); 00087 00088 00089 // ----- AbstractConcurrentQueue interface ------------------------------ 00090 00091 public: 00092 /** 00093 * Return the current flush state. 00094 */ 00095 virtual FlushState getFlushState() const; 00096 00097 /** 00098 * Return whether a flush is pending or not. 00099 * 00100 * @return true if a flush is pending 00101 */ 00102 virtual bool isFlushPending() const; 00103 00104 /** 00105 * Set the batch size of the queue. 00106 * 00107 * @param cBatch the batch size to set 00108 */ 00109 virtual void setBatchSize(int32_t cBatch); 00110 00111 /** 00112 * Return the batch size of the queue. 00113 * 00114 * @return the batch size of the queue 00115 */ 00116 virtual int32_t getBatchSize() const; 00117 00118 /** 00119 * Return the ElementCounter for this queue 00120 * 00121 * @return the element counter for this queue 00122 */ 00123 virtual AtomicCounter::Handle getElementCounter(); 00124 00125 /** 00126 * Return the object used for notifications on this queue. 00127 * 00128 * @return the object used for notifications on this queue. 00129 */ 00130 virtual Object::Handle getNotifier(); 00131 00132 /** 00133 * Set the object used for notifications on this queue. 00134 * 00135 * @param hNotifier the object used for notifications on this queue 00136 */ 00137 virtual void setNotifier(Object::Handle hNotifier); 00138 00139 /** 00140 * Wait for the queue to contain at least one entry. 00141 * 00142 * Note: By the time the method returns the entry may have already 00143 * been removed by another thread. 00144 * 00145 * @param cMillis the number of milliseconds to wait before returing 00146 * without having been notified, or 0 to wait until 00147 * notified 00148 */ 00149 virtual void waitForEntry(int64_t cMillis); 00150 00151 /** 00152 * Return the total number of times the queue has been emptied. 00153 * 00154 * @return the total number of times the queue has been emptied. 00155 */ 00156 virtual int64_t getStatsEmptied() const; 00157 00158 /** 00159 * Return the total number of times the queue has been flushed. 00160 * 00161 * @return the total number of times the queue has been flushed 00162 */ 00163 virtual int64_t getStatsFlushed() const; 00164 00165 00166 // ----- internal helpers ----------------------------------------------- 00167 00168 protected: 00169 /** 00170 * Check whether or not the flush (notify) is necessary. This method 00171 * is always called when a new item is added to the queue. 00172 * 00173 * @param cElements the number of elements in the queue after the 00174 * addition 00175 */ 00176 virtual void checkFlush(int32_t cElements); 00177 00178 /** 00179 * Flush the queue. 00180 * 00181 * @param fAuto iff the flush was invoked automatically based on the 00182 * notification batch size 00183 * 00184 */ 00185 virtual void flush(bool fAuto); 00186 00187 /** 00188 * Event called each time an element is added to the queue. 00189 */ 00190 virtual void onAddElement(); 00191 00192 /** 00193 * Event called when the queue becomes empty. 00194 */ 00195 virtual void onEmpty(); 00196 00197 /** 00198 * Set the flush state and return the previous state. 00199 * 00200 * @param nState the state to set 00201 * 00202 * @return the previous flush state 00203 */ 00204 virtual FlushState updateFlushState(FlushState nState); 00205 00206 /** 00207 * Set the flush state iff the assumed state is correct, return the 00208 * previous flushState 00209 * 00210 * @param nStateAssumed the assumed current value of the flush state 00211 * @param nStateNew the new flush state value 00212 * 00213 * @return FlushState the old flush state value 00214 */ 00215 virtual FlushState updateFlushStateConditionally(FlushState nStateAssumed, 00216 FlushState nStateNew); 00217 00218 /** 00219 * Return the FlushState counter for this queue 00220 * 00221 * @return the FlushState counter for this queue 00222 */ 00223 virtual AtomicCounter::Handle getAtomicFlushState(); 00224 00225 /** 00226 * Set the total number of times the queue has been emptied. 00227 * 00228 * @param cEmpties the total number of times the queue has been 00229 * flushed. 00230 */ 00231 virtual void setStatsEmptied(int64_t cEmpties); 00232 00233 /** 00234 * Set the total number of times the queue has been flushed. 00235 * 00236 * @param cFlushed the total number of times the queue has been 00237 * flushed 00238 */ 00239 virtual void setStatsFlushed(int64_t cFlushed); 00240 00241 00242 // ----- Queue interface ------------------------------------------------ 00243 00244 public: 00245 /** 00246 * @{inheritDoc} 00247 */ 00248 virtual void flush(); 00249 00250 /** 00251 * @{inheritDoc} 00252 */ 00253 virtual size32_t size() const; 00254 00255 /** 00256 * @{inheritDoc} 00257 */ 00258 virtual Object::Holder remove(); 00259 00260 00261 // ----- data members --------------------------------------------------- 00262 00263 protected: 00264 /** 00265 * The AtomicLong used to maintain the FlushState. See 00266 * getFlushState() and setFlushState() helper methods. 00267 */ 00268 FinalHandle<AtomicCounter> m_hAtomicFlushState; 00269 00270 /** 00271 * The queue size at which to auto-flush the queue during an add 00272 * operation. If the BatchSize is greater then one, the caller must 00273 * externally call flush() when it has finished adding elements in 00274 * order to ensure that they may be processed by any waiting consumer 00275 * thread. 00276 */ 00277 int32_t m_iBatchSize; 00278 00279 /** 00280 * A counter for maintaining the size of the queue. 00281 */ 00282 FinalHandle<AtomicCounter> m_hElementCounter; 00283 00284 /** 00285 * The monitor on which notifications related to a queue addition 00286 * will be performed. The default value is the Queue itself. The 00287 * Notifier should not be changed while the queue is in use. If the 00288 * Notifier is null then notification will be disabled. 00289 */ 00290 MemberHandle<Object> m_hNotifier; 00291 00292 /** 00293 * The total number of times the queue transitioned to the empty 00294 * state. 00295 */ 00296 int64_t m_lStatsEmptied; 00297 00298 /** 00299 * The total number of times the queue has been flushed. 00300 */ 00301 int64_t m_lStatusFlushed; 00302 00303 /** 00304 * Indicate whether the notifier references itself 00305 */ 00306 bool m_fSelfNotifier; 00307 }; 00308 00309 COH_CLOSE_NAMESPACE2 00310 00311 #endif // COH_ABSTRACT_CONCURRENT_QUEUE