Oracle® Fusion Middleware C++ API Reference for Oracle Coherence
12c (12.1.3.0.0)

E47891-01

coherence/net/cache/AbstractBundler.hpp

00001 /*
00002 * AbstractBundler.hpp
00003 *
00004 * Copyright (c) 2000, 2014, Oracle and/or its affiliates. All rights reserved.
00005 *
00006 * Oracle is a registered trademarks of Oracle Corporation and/or its
00007 * affiliates.
00008 *
00009 * This software is the confidential and proprietary information of Oracle
00010 * Corporation. You shall not disclose such confidential and proprietary
00011 * information and shall use it only in accordance with the terms of the
00012 * license agreement you entered into with Oracle.
00013 *
00014 * This notice may not be removed or altered.
00015 */
00016 #ifndef COH_ABSTRACT_BUNDLER_HPP
00017 #define COH_ABSTRACT_BUNDLER_HPP
00018 
00019 #include "coherence/lang.ns"
00020 
00021 #include "coherence/net/NamedCache.hpp"
00022 #include "coherence/util/ArrayList.hpp"
00023 #include "coherence/util/AtomicCounter.hpp"
00024 #include "coherence/util/List.hpp"
00025 
00026 COH_OPEN_NAMESPACE3(coherence,net,cache)
00027 
00028 using coherence::net::NamedCache;
00029 using coherence::util::ArrayList;
00030 using coherence::util::AtomicCounter;
00031 using coherence::util::List;
00032 
00033 /**
00034  * An abstract base for processors that implement bundling strategy.
00035  *
00036  * Assume that we receive a continuous and concurrent stream of individual
00037  * operations on multiple threads in parallel. Let's also assume those individual
00038  * operations have relatively high latency (network or database-related) and
00039  * there are functionally analogous [bulk] operations that take a collection of
00040  * arguments instead of a single one without causing the latency to grow
00041  * linearly, as a function of the collection size. Examples of operations and
00042  * topologies that satisfy these assumptions are:
00043  * <ul>
00044  *   <li> get() and getAll() methods for the NamedCache API for the
00045  *        partitioned cache service topology;
00046  *   <li> put() and putAll() methods for the NamedCache API for the
00047  *        partitioned cache service topology;
00048  * </ul>
00049  *
00050  * Under these assumptions, it's quite clear that the bundler could achieve a
00051  * better utilization of system resources and better throughput if slightly
00052  * delays the individual execution requests with a purpose of "bundling" them
00053  * together and passing int32_to a corresponding bulk operation. Additionally,
00054  * the "bundled" request should be triggered if a bundle reaches a "preferred
00055  * bundle size" threshold, eliminating a need to wait till a bundle timeout is
00056  * reached.
00057  *
00058  * Note: we assume that all bundle-able operations are idempotent and could be
00059  * repeated if un-bundling is necessary due to a bundled operation failure.
00060  *
00061  * @author gg 2007.01.28
00062  * @author lh 2012.06.05
00063  * @since Coherence 12.1.2
00064  */
00065 class COH_EXPORT AbstractBundler
00066     : public abstract_spec<AbstractBundler>
00067     {
00068     // ----- handle definitions (needed for nested classes) -----------------
00069 
00070     public:
00071         typedef this_spec::Handle Handle;
00072         typedef this_spec::View   View;
00073         typedef this_spec::Holder Holder;
00074 
00075     // ----- constructors ---------------------------------------------------
00076 
00077     /**
00078      * Construct the bundler. By default, the timeout delay value is set to
00079      * one millisecond and the auto-adjustment feature is turned on.
00080      */
00081     protected:
00082         AbstractBundler();
00083 
00084     private:
00085         /**
00086          * Blocked copy constructor.
00087          */
00088         AbstractBundler(const AbstractBundler&);
00089 
00090     // ----- property accessors ---------------------------------------------
00091 
00092     public:
00093         /**
00094          * Obtain the bundle size threshold value.
00095          *
00096          * @return the bundle size threshold value expressed in the same units
00097          *         as the value returned by the {@link Bundle::getBundleSize()}
00098          *         method
00099          */
00100         virtual int32_t getSizeThreshold() const;
00101 
00102         /**
00103          * Specify the bundle size threshold value.
00104          *
00105          * @param cSize  the bundle size threshold value; must be positive
00106          *               value expressed in the same units as the value returned
00107          *               by the {@link Bundle::getBundleSize()} method
00108          */
00109         virtual void setSizeThreshold(int32_t cSize);
00110 
00111         /**
00112          * Obtains the minimum number of threads that will trigger the bundler
00113          * to switch from a pass through to a bundled mode.
00114          *
00115          * @return a the number of threads threshold
00116          */
00117         virtual int32_t getThreadThreshold() const;
00118 
00119         /**
00120          * Specify the minimum number of threads that will trigger the bundler to
00121          * switch from a pass through to a bundled mode.
00122          *
00123          * @param cThreads  the number of threads threshold
00124          */
00125         virtual void setThreadThreshold(int32_t cThreads);
00126 
00127         /**
00128          * Obtain the timeout delay value.
00129          *
00130          * @return the timeout delay value in milliseconds
00131          */
00132         virtual int64_t getDelayMillis() const;
00133 
00134         /**
00135          * Specify the timeout delay value.
00136          *
00137          * @param lDelay  the timeout delay value in milliseconds
00138          */
00139         virtual void setDelayMillis(int64_t lDelay);
00140 
00141         /**
00142          * Check whether or not auto-adjustment is allowed.
00143          *
00144          * @return true if auto-adjustment is allowed
00145          */
00146         virtual bool isAllowAutoAdjust() const;
00147 
00148         /**
00149          * Specify whether or not auto-adjustment is allowed.
00150          *
00151          * @param fAutoAdjust  true if auto-adjustment should be allowed;
00152          *                     false otherwise
00153          */
00154         virtual void setAllowAutoAdjust(bool fAutoAdjust);
00155 
00156     // ----- statistics ------------------------------------------------------
00157 
00158     protected:
00159         /**
00160          * Update the statistics for this Bundle.
00161          */
00162         virtual void updateStatistics();
00163 
00164     public:
00165         /**
00166          * Reset this Bundler statistics.
00167          */
00168         virtual void resetStatistics();
00169 
00170         /**
00171          * Adjust this Bundler's parameters according to the available
00172          * statistical information.
00173          */
00174         virtual void adjust();
00175 
00176     // ----- Object interface -----------------------------------
00177 
00178     public:
00179         /**
00180          * {@inheritDoc}
00181          */
00182         virtual void toStream(std::ostream& out) const;
00183 
00184     // ----- inner class: Bundle --------------------------------------------
00185 
00186     /**
00187      * Bundle represents a unit of optimized execution.
00188      */
00189     protected:
00190         class COH_EXPORT Bundle
00191             : public abstract_spec<Bundle>
00192             {
00193             friend class factory<Bundle>;
00194             friend class AbstractBundler;
00195 
00196             // ----- constructors ---------------------------------------------------
00197 
00198             /**
00199              * Default constructor.
00200              *
00201              * @param hBundler  the AbstructBundler
00202              */
00203             protected:
00204                 Bundle(AbstractBundler::Handle hBundler);
00205 
00206             // ----- accessors -------------------------------------------------
00207 
00208             public:
00209                 /**
00210                  * Check whether or not this bundle is open for adding request elements.
00211                  *
00212                  * @return true if this Bundle is still open
00213                  */
00214                 virtual bool isOpen() const;
00215 
00216                 /**
00217                  * Return the Bundler.
00218                  *
00219                  * @return the Bundler
00220                  */
00221                 virtual AbstractBundler::Handle getBundler();
00222 
00223             protected:
00224                 /**
00225                  * Check whether or not this bundle is in the "pending" state -
00226                  * awaiting for the execution results.
00227                  *
00228                  * @return true if this Bundle is in the "pending" state
00229                  */
00230                 virtual bool isPending() const;
00231 
00232                 /**
00233                  * Check whether or not this bundle is in the "processed" state -
00234                  * ready to return the result of execution back to the client.
00235                  *
00236                  * @return true if this Bundle is in the "processed" state
00237                  */
00238                 virtual bool isProcessed() const;
00239 
00240                 /**
00241                  * Check whether or not this bundle is in the "exception" state -
00242                  * bundled execution threw an exception and requests have to be
00243                  * un-bundled.
00244                  *
00245                  * @return true if this Bundle is in the "exception" state
00246                  */
00247                 virtual bool isException() const;
00248 
00249                 /**
00250                  * Change the status of this Bundle.
00251                  *
00252                  * @param iStatus  the new status value
00253                  */
00254                 virtual void setStatus(int32_t iStatus);
00255 
00256                 /**
00257                  * Obtain this bundle size. The return value should be expressed in the
00258                  * same units as the value returned by the
00259                  * {@link AbstractBundler::getSizeThreshold getSizeThreshold} method.
00260                  *
00261                  * @return the bundle size
00262                  */
00263                 virtual int32_t getBundleSize() const;
00264 
00265                 /**
00266                  * Check whether or not this is a "master" Bundle.
00267                  *
00268                  * @return true if this Bundle is a designated "master" Bundle
00269                  */
00270                 virtual bool isMaster() const;
00271 
00272                 /**
00273                  * Designate this Bundle as a "master" bundle.
00274                  */
00275                 virtual void setMaster();
00276 
00277                 // ----- processing and subclassing support --------------------------
00278 
00279             public:
00280                 /**
00281                  * Wait until results of bundled requests are retrieved.
00282                  * 
00283                  * Note that calls to this method must be externally synchronized.
00284                  *
00285                  * @param fFirst  true if this is the first thread entering the bundle
00286                  *
00287                  * @return true if this thread is supposed to perform an actual bundled
00288                  *         operation (burst); false otherwise
00289                  */
00290                 virtual bool waitForResults(bool fFirst);
00291 
00292             protected:
00293                 /**
00294                  * Obtain results of the bundled requests. This method should be
00295                  * implemented by concrete Bundle implementations using the most
00296                  * efficient mechanism.
00297                  */
00298                 virtual void ensureResults() = 0;
00299 
00300                 /**
00301                  * Obtain results of the bundled requests or ensure that the results
00302                  * have already been retrieved.
00303                  *
00304                  * @param fBurst  specifies whether or not the actual results have to be
00305                  *                fetched on this thread; this parameter will be true
00306                  *                for one and only one thread per bundle
00307                  *
00308                  * @return true if the bundling has succeeded; false if the un-bundling
00309                  *         has to be performed as a result of a failure
00310                  */
00311                 virtual bool ensureResults(bool fBurst);
00312 
00313                 /**
00314                  * Release all bundle resources associated with the current thread.
00315                  *
00316                  * @return true if all entered threads have released
00317                  */
00318                 virtual bool releaseThread();
00319 
00320             // ----- statistics and debugging  -----------------------------------
00321 
00322             public:
00323                 /**
00324                  * Reset statistics for this Bundle.
00325                  */
00326                 virtual void resetStatistics();
00327 
00328             // ----- Object interface -----------------------------------
00329 
00330             protected:
00331                 /**
00332                  * {@inheritDoc}
00333                  */
00334                 virtual void toStream(std::ostream& out) const;
00335 
00336             protected:
00337                 /**
00338                  * Return a human readable name for the specified status value.
00339                  *
00340                  * @param iStatus  the status value to format
00341                  *
00342                  * @return a human readable status name
00343                  */
00344                 virtual String::View formatStatusName(int32_t iStatus) const;
00345 
00346             // ----- data fields and constants ---------------------------------
00347 
00348             public:
00349                 /**
00350                  * This Bundle accepting additional items.
00351                  */
00352                 static const int32_t status_open      = 0;
00353 
00354                 /**
00355                  * This Bundle is closed for accepting additional items and awaiting
00356                  * for the execution results.
00357                  */
00358                 static const int32_t status_pending   = 1;
00359 
00360                 /**
00361                  * This Bundle is in process of returning the result of execution
00362                  * back to the client.
00363                  */
00364                 static const int32_t status_processed = 2;
00365 
00366                 /**
00367                  * Attempt to bundle encountered and exception; the execution has to be
00368                  * de-optimized and performed by individual threads.
00369                  */
00370                 static const int32_t status_exception = 3;
00371 
00372             private:
00373                 /**
00374                  * The bundler the operations are performed on.
00375                  */
00376                 FinalHandle<AbstractBundler> f_hBundler;
00377 
00378                 /**
00379                  * This Bundle status.
00380                  */
00381                 Volatile<int32_t> m_iStatus;
00382 
00383                 /**
00384                  * A count of threads that are using this Bundle.
00385                  */
00386                 int32_t m_cThreads;
00387 
00388                 /**
00389                  * A flag that differentiates the "master" bundle which is responsible
00390                  * for all auto-adjustments. It's set to "true" for one and only one
00391                  * Bundle object.
00392                  */
00393                 bool m_fMaster;
00394 
00395                 // stat fields intentionally have the "package private" access to
00396                 // prevent generation of synthetic access methods
00397 
00398             protected:
00399                 /**
00400                  * Statistics: the total number of times this Bundle has been used for
00401                  * bundled request processing.
00402                  */
00403                 Volatile<int64_t> m_cTotalBundles;
00404 
00405                 /**
00406                  * Statistics: the total size of individual requests processed by this
00407                  * Bundle expressed in the same units as values returned by the
00408                  * {@link Bundle::getBundleSize()} method.
00409                  */
00410                 Volatile<int64_t> m_cTotalSize;
00411 
00412                 /**
00413                  * Statistics: a timestamp of the first thread entering the bundle.
00414                  */
00415                 int64_t m_ldtStart;
00416 
00417                 /**
00418                  * Statistics: a total time duration this Bundle has spent in bundled
00419                  * request processing (burst).
00420                  */
00421                 Volatile<int64_t> m_cTotalBurstDuration;
00422 
00423                 /**
00424                  * Statistics: a total time duration this Bundle has spent waiting
00425                  * for bundle to be ready for processing.
00426                  */
00427                 Volatile<int64_t> m_cTotalWaitDuration;
00428             };
00429 
00430     // ----- inner class: Statistics -------------------------------------------
00431 
00432     /**
00433      * Statistics class contains the latest bundler statistics.
00434      */
00435     protected:
00436         class COH_EXPORT Statistics
00437             : public class_spec<Statistics>
00438             {
00439             friend class factory<Statistics>;
00440             friend class AbstractBundler;
00441 
00442             public:
00443                 /**
00444                  * Reset the statistics.
00445                  */
00446                 virtual void reset();
00447 
00448             // ----- Object interface -----------------------------------
00449 
00450             public:
00451                 /**
00452                  * {@inheritDoc}
00453                  */
00454                 virtual void toStream(std::ostream& out) const;
00455 
00456             // ----- running averages ------------------------------------------
00457 
00458             protected:
00459                 /**
00460                  * An average time for bundled request processing (burst).
00461                  */
00462                 int32_t m_cAverageBurstDuration;
00463 
00464                 /**
00465                  * An average bundle size for this Bundler.
00466                  */
00467                 int32_t m_cAverageBundleSize;
00468 
00469                 /**
00470                  * An average thread waiting time caused by under-filled bundle. The
00471                  * wait time includes the time spend in the bundled request processing.
00472                  */
00473                 int32_t m_cAverageThreadWaitDuration;
00474 
00475                 /**
00476                  * An average bundled request throughput in size units per millisecond
00477                  * (total bundle size over total processing time)
00478                  */
00479                 int32_t m_nAverageThroughput;
00480 
00481                 // ----- snapshots --------------------------------------------------
00482 
00483                 /**
00484                  * Snapshot for a total number of processed bundled.
00485                  */
00486                 int64_t m_cBundleCountSnapshot;
00487 
00488                 /**
00489                  * Snapshot for a total size of processed bundled.
00490                  */
00491                 int64_t m_cBundleSizeSnapshot;
00492 
00493                 /**
00494                  * Snapshot for a burst duration.
00495                  */
00496                 int64_t m_cBurstDurationSnapshot;
00497 
00498                 /**
00499                  * Snapshot for a combined thread waiting time.
00500                  */
00501                 int64_t m_cThreadWaitSnapshot;
00502             };
00503 
00504     // ----- sublcassing support --------------------------------------------
00505 
00506     protected:
00507         /**
00508          * Initialize the bundler.
00509          *
00510          * @param hBundle              specifies the bundle for this bundler
00511          */
00512         virtual void init(Bundle::Handle hBundle);
00513 
00514         /**
00515          * Retrieve any Bundle that is currently in the open state. This method
00516          * does not assume any external synchronization and as a result, a
00517          * caller must double check the returned bundle open state (after
00518          * synchronizing on it).
00519          *
00520          * @return an open Bundle
00521          */
00522         virtual Bundle::Handle getOpenBundle();
00523 
00524         /**
00525          * Instantiate a new Bundle object.
00526          *
00527          * @return a new Bundle object
00528          */
00529         virtual Bundle::Handle instantiateBundle() = 0;
00530 
00531     // ----- constants and data fields ---------------------------------------
00532 
00533     public:
00534         /**
00535          * Frequency of the adjustment attempts. This number represents a number of
00536          * iterations of the master bundle usage after which an adjustment attempt
00537          * will be performed.
00538          */
00539         static const int32_t adjustment_frequency = 128;
00540 
00541     protected:
00542         /**
00543          * Indicate whether this is a first time adjustment.
00544          */
00545         bool m_fFirstAdjustment;
00546 
00547         /**
00548          * The previous bundle size threshold value.
00549          */
00550         double m_dPreviousSizeThreshold;
00551 
00552         /**
00553          * A pool of Bundle objects. Note that this list never shrinks.
00554          */
00555         FinalHandle<List> f_hListBundle;
00556 
00557         /**
00558          * A counter for the total number of threads that have started any bundle
00559          * related execution. This counter is used by subclasses to reduce an impact
00560          * of bundled execution for lightly loaded environments.
00561          */
00562         FinalHandle<AtomicCounter> f_hCountThreads;
00563 
00564     private:
00565         /**
00566          * The bundle size threshold. We use double for this value to allow for
00567          * fine-tuning of the auto-adjust algorithm.
00568          *
00569          * @see adjust
00570          */
00571         double m_dSizeThreshold;
00572 
00573         /**
00574          * The minimum number of threads that should trigger the bundler to switch
00575          * from a pass through mode to a bundled mode.
00576          */
00577         int32_t m_cThreadThreshold;
00578 
00579         /**
00580          * Specifies whether or not auto-adjustment is on. Default value is "true".
00581          */
00582         bool m_fAllowAuto;
00583 
00584         /**
00585          * The delay timeout in milliseconds. Default value is one millisecond.
00586          */
00587         int64_t m_lDelayMillis;
00588 
00589         /**
00590          * Last active (open) bundle position.
00591          */
00592         Volatile<int32_t> m_iActiveBundle;
00593 
00594         /**
00595          * An instance of the Statistics object containing the latest statistics.
00596          */
00597         FinalHandle<Statistics> f_hStats;
00598     };
00599 
00600 COH_CLOSE_NAMESPACE3
00601 
00602 #endif // COH_ABSTRACT_BUNDLER_HPP
Copyright © 2000, 2014, Oracle and/or its affiliates. All rights reserved.