00001 /* 00002 * AbstractConcurrentQueue.hpp 00003 * 00004 * Copyright (c) 2000, 2014, Oracle and/or its affiliates. All rights reserved. 00005 * 00006 * Oracle is a registered trademarks of Oracle Corporation and/or its 00007 * affiliates. 00008 * 00009 * This software is the confidential and proprietary information of Oracle 00010 * Corporation. You shall not disclose such confidential and proprietary 00011 * information and shall use it only in accordance with the terms of the 00012 * license agreement you entered into with Oracle. 00013 * 00014 * This notice may not be removed or altered. 00015 */ 00016 #ifndef COH_ABSTRACT_CONCURRENT_QUEUE 00017 #define COH_ABSTRACT_CONCURRENT_QUEUE 00018 00019 #include "coherence/lang.ns" 00020 00021 #include "coherence/util/Queue.hpp" 00022 00023 COH_OPEN_NAMESPACE2(coherence,util) 00024 00025 00026 /** 00027 * The ConcurrentQueue provides a means to efficiently (and in a thread-safe 00028 * manner) queue elements with minimal contention. 00029 * 00030 * Note: The ConcurrentQueue does not support null entries. 00031 * 00032 * @author nsa 2008.01.19 00033 */ 00034 class COH_EXPORT AbstractConcurrentQueue 00035 : public abstract_spec<AbstractConcurrentQueue, 00036 extends<Object>, 00037 implements<Queue> > 00038 { 00039 // ----- enums ---------------------------------------------------------- 00040 00041 public: 00042 /** 00043 * The FlushState values are used to indicate the state of the Queue 00044 * with regards to flushing: 00045 * 00046 * <ul> 00047 * <li> 00048 * flush_pending: Indicates that a flush is pending. At 00049 * some point the queue will flush itself 00050 * automatically or the consumer will explicitly 00051 * flush the queue. 00052 * </li> 00053 * <li> 00054 * flush_auto: Indicates that no flush is pending as the queue 00055 * has been auto flushed. This state will not be 00056 * reset to flush_pending until the queue has been 00057 * cleared and the producer has added a new 00058 * element. 00059 * </li> 00060 * <li> 00061 * flush_explicit: Indicates that no flush is pending as the queue 00062 * has been explicitly flushed. This state will 00063 * not be reset to flush_pending until the queue 00064 * has been cleared and the producer has added a 00065 * new element, or if the producer calls flush 00066 * multiple times before the queue has 00067 * been cleared. 00068 * </li> 00069 * </ul> 00070 */ 00071 typedef enum 00072 { 00073 flush_pending = 0, 00074 flush_auto = 1, 00075 flush_explicit = 2 00076 } FlushState; 00077 00078 00079 // ----- constructors --------------------------------------------------- 00080 00081 protected: 00082 /** 00083 * @internal 00084 */ 00085 AbstractConcurrentQueue(); 00086 00087 00088 // ----- AbstractConcurrentQueue interface ------------------------------ 00089 00090 public: 00091 /** 00092 * Return the current flush state. 00093 */ 00094 virtual FlushState getFlushState() const; 00095 00096 /** 00097 * Return whether a flush is pending or not. 00098 * 00099 * @return true if a flush is pending 00100 */ 00101 virtual bool isFlushPending() const; 00102 00103 /** 00104 * Set the batch size of the queue. 00105 * 00106 * @param cBatch the batch size to set 00107 */ 00108 virtual void setBatchSize(int32_t cBatch); 00109 00110 /** 00111 * Return the batch size of the queue. 00112 * 00113 * @return the batch size of the queue 00114 */ 00115 virtual int32_t getBatchSize() const; 00116 00117 /** 00118 * Return the object used for notifications on this queue. 00119 * 00120 * @return the object used for notifications on this queue. 00121 */ 00122 virtual Object::Handle getNotifier(); 00123 00124 /** 00125 * Set the object used for notifications on this queue. 00126 * 00127 * @param hNotifier the object used for notifications on this queue 00128 */ 00129 virtual void setNotifier(Object::Handle hNotifier); 00130 00131 /** 00132 * Wait for the queue to contain at least one entry. 00133 * 00134 * Note: By the time the method returns the entry may have already 00135 * been removed by another thread. 00136 * 00137 * @param cMillis the number of milliseconds to wait before returing 00138 * without having been notified, or 0 to wait until 00139 * notified 00140 */ 00141 virtual void waitForEntry(int64_t cMillis); 00142 00143 /** 00144 * Return the total number of times the queue has been emptied. 00145 * 00146 * @return the total number of times the queue has been emptied. 00147 */ 00148 virtual int64_t getStatsEmptied() const; 00149 00150 /** 00151 * Return the total number of times the queue has been flushed. 00152 * 00153 * @return the total number of times the queue has been flushed 00154 */ 00155 virtual int64_t getStatsFlushed() const; 00156 00157 00158 // ----- internal helpers ----------------------------------------------- 00159 00160 protected: 00161 /** 00162 * Check whether or not the flush (notify) is necessary. This method 00163 * is always called when a new item is added to the queue. 00164 * 00165 * @param cElements the number of elements in the queue after the 00166 * addition 00167 */ 00168 virtual void checkFlush(int32_t cElements); 00169 00170 /** 00171 * Flush the queue. 00172 * 00173 * @param fAuto iff the flush was invoked automatically based on the 00174 * notification batch size 00175 * 00176 */ 00177 virtual void flush(bool fAuto); 00178 00179 /** 00180 * Event called each time an element is added to the queue. 00181 */ 00182 virtual void onAddElement(); 00183 00184 /** 00185 * Event called when the queue becomes empty. 00186 */ 00187 virtual void onEmpty(); 00188 00189 /** 00190 * Set the flush state and return the previous state. 00191 * 00192 * @param nState the state to set 00193 * 00194 * @return the previous flush state 00195 */ 00196 virtual FlushState updateFlushState(FlushState nState); 00197 00198 /** 00199 * Set the flush state iff the assumed state is correct, return the 00200 * previous flushState 00201 * 00202 * @param nStateAssumed the assumed current value of the flush state 00203 * @param nStateNew the new flush state value 00204 * 00205 * @return FlushState the old flush state value 00206 */ 00207 virtual FlushState updateFlushStateConditionally(FlushState nStateAssumed, 00208 FlushState nStateNew); 00209 00210 /** 00211 * Set the total number of times the queue has been emptied. 00212 * 00213 * @param cEmpties the total number of times the queue has been 00214 * flushed. 00215 */ 00216 virtual void setStatsEmptied(int64_t cEmpties); 00217 00218 /** 00219 * Set the total number of times the queue has been flushed. 00220 * 00221 * @param cFlushed the total number of times the queue has been 00222 * flushed 00223 */ 00224 virtual void setStatsFlushed(int64_t cFlushed); 00225 00226 00227 // ----- Queue interface ------------------------------------------------ 00228 00229 public: 00230 /** 00231 * @{inheritDoc} 00232 */ 00233 virtual void flush(); 00234 00235 /** 00236 * @{inheritDoc} 00237 */ 00238 virtual size32_t size() const; 00239 00240 /** 00241 * @{inheritDoc} 00242 */ 00243 virtual Object::Holder remove(); 00244 00245 00246 // ----- data members --------------------------------------------------- 00247 00248 protected: 00249 /** 00250 * The AtomicLong used to maintain the FlushState. See 00251 * getFlushState() and setFlushState() helper methods. 00252 */ 00253 NativeAtomic64 m_nAtomicFlushState; 00254 00255 /** 00256 * The queue size at which to auto-flush the queue during an add 00257 * operation. If the BatchSize is greater then one, the caller must 00258 * externally call flush() when it has finished adding elements in 00259 * order to ensure that they may be processed by any waiting consumer 00260 * thread. 00261 */ 00262 int32_t m_iBatchSize; 00263 00264 /** 00265 * A counter for maintaining the size of the queue. 00266 */ 00267 NativeAtomic64 m_nElementCounter; 00268 00269 /** 00270 * The monitor on which notifications related to a queue addition 00271 * will be performed. The default value is the Queue itself. The 00272 * Notifier should not be changed while the queue is in use. If the 00273 * Notifier is null then notification will be disabled. 00274 */ 00275 FinalHandle<Object> f_hNotifier; 00276 00277 /** 00278 * The total number of times the queue transitioned to the empty 00279 * state. 00280 */ 00281 int64_t m_lStatsEmptied; 00282 00283 /** 00284 * The total number of times the queue has been flushed. 00285 */ 00286 int64_t m_lStatusFlushed; 00287 00288 /** 00289 * Indicate whether the notifier references itself 00290 */ 00291 bool m_fSelfNotifier; 00292 }; 00293 00294 COH_CLOSE_NAMESPACE2 00295 00296 #endif // COH_ABSTRACT_CONCURRENT_QUEUE