Class WorkManager
- java.lang.Object
-
- com.tangosol.util.Base
-
- com.tangosol.coherence.commonj.WorkManager
-
- All Implemented Interfaces:
commonj.work.WorkManager
public class WorkManager extends Base implements commonj.work.WorkManager
An implementation of commonj.work.WorkManager based on a joint BEA-IBM specification: "The Timer and Work Manager for Application Servers", which is a foundation of JSR 237: Work Manager for Application Servers.Implementation notes:
- The WorkManager is constructed using a unique name; there could be one and only one manager per cluster node with this name. This name will also be the name of the underlying InvocationService instance used by the WorkManager. The WorkManagers running on different cluster nodes that have the same name will form a pool of available WorkManager threads.
- The WorkManager is constructed using a number of threads dedicated for Work processing; if this value is zero, this WorkManager instance will run in a "client-only" role, meaning that it will not accept any work load sent by other WorkManager clients.
- The WorkManage could be constructed with a specific ConfigurableCacheFactory. In this case, the WorkManager name will be used to instantiate the corresponding InvocationService.
- This WorkManager implementation allows client to request current
"work in progress" feedback via
getCurrentResult(commonj.work.WorkItem)
method. - The specification does not explicitly specify what response should be if a remote server crashes during an execution. Current implementation uses WORK_COMPLETED with WorkCompletedException as a result. It's important to understand that in a generic case it's impossible to know whether or not a Work has been completed.
- If a Work originator Member terminates before the Work gets started, the remote server will still run it.
- Only serializable Work (standard or POF) is currently supported.
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description protected static class
WorkManager.AbstractWork
Base class for Invocable tasks related to the WorkManager.protected static class
WorkManager.CollectMembershipInfo
Invocable task used during handshake, which has to be scheduledimmediately
.protected static class
WorkManager.ReleaseWork
Invocable task that forces a release for a wrapped Work object.protected class
WorkManager.RemoteWorkManager
RemoteWorkManager is a delegating WorkManager wrapper.protected static class
WorkManager.RequestStatus
Invocable task that requests a status for a given Work.protected static class
WorkManager.ScheduleWork
Invocable task that posts a wrapped Work object to a WorkManager queue.protected static class
WorkManager.ScheduleWorkRejectedException
The ScheduleWorkRejectedException is a WrapperException that carries the information about the rejected Work.protected static class
WorkManager.SendFeedback
Invocable task that delivers a WorkEvent to a WorkListener.protected class
WorkManager.ServiceListener
The member listener for the InvocationService used by the WorkManager.protected class
WorkManager.WorkHolder
A holder for a Work object that serves as a communication intermediary between a client and a [remote] service.protected class
WorkManager.WorkObserver
The InvocationObserver for all posted work.protected static class
WorkManager.WorkStatus
A WorkStatus object is used to communicate a work execution status back to the caller.-
Nested classes/interfaces inherited from class com.tangosol.util.Base
Base.LoggingWriter
-
-
Field Summary
Fields Modifier and Type Field Description protected InvocationObserver
m_workObserver
The ScheduleWork agent observer.
-
Constructor Summary
Constructors Constructor Description WorkManager(String sManagerName, int cThreads)
Construct the WorkManager.WorkManager(String sManagerName, ConfigurableCacheFactory factory)
Construct the WorkManager using the specified ConfigurableCacheFactory.
-
Method Summary
All Methods Static Methods Instance Methods Concrete Methods Modifier and Type Method Description void
cancelPostedWork()
Cancel all posted work that has not yet completed.protected Member
chooseServer()
Choose a Member to run a next Work at.protected void
collectMembershipInfo()
Inform all the members about this node's role (client or server) and collect the information regarding other nodes' rolesvoid
disableServer(Member member)
Remove a Member from a list of active server nodes and terminate all work posted to that server.protected int
generateWorkId()
Generate a unique (for this WorkManager) work id.Map<Member,Map<Long,WorkManager.ScheduleWork>>
getAcceptedWork()
Return a catalog of accepted ScheduleWork tasks as known by the server side keyed by the origin Member where the value is a map of ScheduleWork tasks keyed by the WorkId.WorkManager.WorkStatus
getCurrentResult(commonj.work.WorkItem item)
Retrieve a current WorkStatus (partial result) for a given Work.Map<Long,WorkManager.WorkHolder>
getPostedWork()
Return a catalog of WorkHolder objects for all posted Work as known by the client side keyed by the corresponding work id.protected WorkManager.WorkHolder
getPostedWork(long lWorkId)
Return a WorkHolder for a posted work with a given id.Set<Member>
getServers()
Return a set of Members that run the same Invocation service as this WorkManager in a "server" capacity.InvocationService
getService()
Return an InvocationService used by this WorkManager.protected void
init(String sManagerName, int cThreads, ConfigurableCacheFactory factory)
Initialization.protected void
initWork(WorkManager.ScheduleWork workAgent)
Initialize the remote work agent.protected void
installMemberListener()
Install a MemberListener.protected void
instantiateWorkObserver()
Instantiate a Work observer.boolean
isServer()
Check whether or not this node operates in a "server" mode processing remote Work requests.static void
main(String[] asArg)
Entry point: start a stand-alone WorkManager (server) using the specified name and thread count.protected void
processFeedback(WorkManager.SendFeedback workFeedback)
Process a remote work feedback.protected WorkManager.WorkStatus
processWork(WorkManager.ScheduleWork workAgent)
Run the remote work agent.protected void
releaseWork(WorkManager.ReleaseWork releaseAgent)
Release the remote work agent.protected WorkManager.WorkHolder
removePostedWork(long lWorkId)
Remove a WorkHolder for a posted work with a given id.protected WorkManager.WorkStatus
requestWorkStatus(WorkManager.RequestStatus requestAgent)
Obtain current status of the specified Work.commonj.work.WorkItem
schedule(commonj.work.Work work)
Dispatches a Work asynchronously.commonj.work.WorkItem
schedule(commonj.work.Work work, commonj.work.WorkListener workListener)
Dispatches a Work asynchronously.protected commonj.work.WorkItem
schedule(commonj.work.Work work, commonj.work.WorkListener workListener, Member member)
Accepts a Work instance for processing at the specified Member.protected boolean
updateWork(WorkManager.WorkStatus status)
Update a WorkStatus for a work.boolean
waitForAll(Collection collWorkItems, long lTimeoutMillis)
Wait for all WorkItems in the collection to finish.Collection
waitForAny(Collection collWorkItems, long lTimeoutMillis)
Wait for any of the WorkItems in the collection to finish.-
Methods inherited from class com.tangosol.util.Base
azzert, azzert, azzert, azzertFailed, breakLines, breakLines, capitalize, checkNotEmpty, checkNotNull, checkRange, computeSafeWaitTime, decimalValue, dup, dup, ensureBigDecimal, ensureClassLoader, ensureRuntimeException, ensureRuntimeException, equals, equalsDeep, err, err, err, err, err, escape, formatDateTime, getCallerStackFrame, getCommonMonitor, getCommonMonitor, getCommonMonitor, getContextClassLoader, getContextClassLoader, getDeepMessage, getErr, getLastSafeTimeMillis, getLog, getMaxDecDigits, getMaxHexDigits, getOriginalException, getOut, getProcessRandom, getRandom, getRandomBinary, getRandomBinary, getRandomString, getSafeTimeMillis, getStackFrame, getStackFrames, getStackTrace, getStackTrace, getStackTrace, getThreadFactory, getTimeZone, getUpTimeMillis, hashCode, hexValue, indentString, indentString, isDecimal, isHex, isLogEcho, isOctal, log, log, log, log, log, makeInteger, makeLong, makeThread, mergeArray, mergeBooleanArray, mergeByteArray, mergeCharArray, mergeDoubleArray, mergeFloatArray, mergeIntArray, mergeLongArray, mod, mod, octalValue, out, out, out, out, out, pad, parseBandwidth, parseBandwidth, parseDelimitedString, parseHex, parseHex, parseMemorySize, parseMemorySize, parsePercentage, parseTime, parseTime, parseTimeNanos, parseTimeNanos, printStackTrace, randomize, randomize, randomize, randomize, read, read, read, read, read, read, read, replace, setErr, setLog, setLogEcho, setOut, sleep, toBandwidthString, toBandwidthString, toCharEscape, toCrc, toCrc, toCrc, toCrc, toCrc, toDecString, toDelimitedString, toDelimitedString, toDelimitedString, toDelimitedString, toHex, toHex, toHexDump, toHexEscape, toHexEscape, toHexEscape, toHexEscape, toHexString, toMemorySizeString, toMemorySizeString, toQuotedCharEscape, toQuotedStringEscape, toSqlString, toString, toString, toStringEscape, toUnicodeEscape, trace, trace, trace, trace, trace, trace, trace, trace, trace, truncateString, truncateString, wait
-
-
-
-
Field Detail
-
m_workObserver
protected InvocationObserver m_workObserver
The ScheduleWork agent observer.
-
-
Constructor Detail
-
WorkManager
public WorkManager(String sManagerName, int cThreads)
Construct the WorkManager.- Parameters:
sManagerName
- a unique WorkManager namecThreads
- number of threads dedicated for Work processing; zero for WorkManagers in a "client-only" role
-
WorkManager
public WorkManager(String sManagerName, ConfigurableCacheFactory factory)
Construct the WorkManager using the specified ConfigurableCacheFactory.- Parameters:
sManagerName
- a unique WorkManager namefactory
- ConfigurableCacheFactory that will be used to instantiate the corresponding InvocationService using thefactory.ensureService(sManagerName)
call
-
-
Method Detail
-
init
protected void init(String sManagerName, int cThreads, ConfigurableCacheFactory factory)
Initialization.- Parameters:
sManagerName
- a unique WorkManager namecThreads
- number of threadsfactory
- ConfigurableCacheFactory to use
-
installMemberListener
protected void installMemberListener()
Install a MemberListener.
-
instantiateWorkObserver
protected void instantiateWorkObserver()
Instantiate a Work observer.
-
collectMembershipInfo
protected void collectMembershipInfo()
Inform all the members about this node's role (client or server) and collect the information regarding other nodes' roles
-
schedule
public commonj.work.WorkItem schedule(commonj.work.Work work) throws commonj.work.WorkException
Dispatches a Work asynchronously. The work is dispatched and the method returns immediately.At-most-once semantics are provided. If the server fails then the Work will not be executed on restart.
- Specified by:
schedule
in interfacecommonj.work.WorkManager
- Parameters:
work
- the Work to execute.- Returns:
- the WorkItem representing the asynchronous work; since the Work must be serializable, a RemoteWorkItem is always returned
- Throws:
commonj.work.WorkException
- thrown if queuing this up results in an exception
-
schedule
public commonj.work.WorkItem schedule(commonj.work.Work work, commonj.work.WorkListener workListener) throws commonj.work.WorkException
Dispatches a Work asynchronously. The work is dispatched and the method returns immediately.At-most-once semantics are provided. If the server fails then the Work will not be executed on restart.
- Specified by:
schedule
in interfacecommonj.work.WorkManager
- Parameters:
work
- the Work to executeworkListener
- an optional WorkListener which is used to inform the application of the progress of a Work- Returns:
- the WorkItem representing the asynchronous work; since the Work must be serializable, a RemoteWorkItem is always returned
- Throws:
commonj.work.WorkException
- thrown if queuing this up results in an exception
-
schedule
protected commonj.work.WorkItem schedule(commonj.work.Work work, commonj.work.WorkListener workListener, Member member) throws commonj.work.WorkException
Accepts a Work instance for processing at the specified Member.- Parameters:
work
- the Work to executeworkListener
- an optional WorkListener which is used to inform the application of the progress of a Workmember
- the Member to execute the Work at- Returns:
- a WorkItem representing scheduled Work
- Throws:
commonj.work.WorkException
- thrown if queuing this up results in an exception
-
waitForAll
public boolean waitForAll(Collection collWorkItems, long lTimeoutMillis) throws InterruptedException
Wait for all WorkItems in the collection to finish. If there are no WorkItems in the list then it returns immediately indicating a timeout. WorkItems from different WorkManagers can be placed in a single collection and waited on together.The WorkItems collection should not be altered once submitted until the method returns.
- Specified by:
waitForAll
in interfacecommonj.work.WorkManager
- Parameters:
collWorkItems
- the Collection of WorkItem objects to wait forlTimeoutMillis
- the timeout in milliseconds. If this value is zero then this method returns immediately- Returns:
- true if all WorkItems have completed; false if the timeout has expired
- Throws:
InterruptedException
- thrown if the wait is interruptedIllegalArgumentException
- thrown if workItems is null, any of the objects in the collection are not WorkItems or the timeout is negative
-
waitForAny
public Collection waitForAny(Collection collWorkItems, long lTimeoutMillis) throws InterruptedException
Wait for any of the WorkItems in the collection to finish. If there are no WorkItems in the list then it returns immediately indicating a timeout. WorkItems from different WorkManagers can be placed in a single collection and waited on together.The WorkItems collection should not be altered once submitted until the method returns.
- Specified by:
waitForAny
in interfacecommonj.work.WorkManager
- Parameters:
collWorkItems
- the Collection of WorkItem objects to wait forlTimeoutMillis
- the timeout in ms. If this value is zero then the method returns immediately, i.e. does not block- Returns:
- the WorkItems that have completed or an empty Collection if its timeout expires before any finished
- Throws:
InterruptedException
- thrown if the wait is interruptedIllegalArgumentException
- thrown if workItems is null, any of the objects in the collection are not WorkItems or the timeout is negative
-
getCurrentResult
public WorkManager.WorkStatus getCurrentResult(commonj.work.WorkItem item)
Retrieve a current WorkStatus (partial result) for a given Work.If the specified work has not been rejected or completed, this method will always communicate with a remote server to retrieve the current WorkStatus.
- Parameters:
item
- the WorkItem to retrieve the status for- Returns:
- the corresponding WorkStatus object
-
getService
public InvocationService getService()
Return an InvocationService used by this WorkManager.- Returns:
- an InvocationService used by this WorkManager
-
getServers
public Set<Member> getServers()
Return a set of Members that run the same Invocation service as this WorkManager in a "server" capacity.- Returns:
- a set of server Members
-
getPostedWork
public Map<Long,WorkManager.WorkHolder> getPostedWork()
Return a catalog of WorkHolder objects for all posted Work as known by the client side keyed by the corresponding work id.- Returns:
- the Map of posted WorkHolder objects
-
getAcceptedWork
public Map<Member,Map<Long,WorkManager.ScheduleWork>> getAcceptedWork()
Return a catalog of accepted ScheduleWork tasks as known by the server side keyed by the origin Member where the value is a map of ScheduleWork tasks keyed by the WorkId.- Returns:
- a catalog of started ScheduleWork tasks
-
isServer
public boolean isServer()
Check whether or not this node operates in a "server" mode processing remote Work requests.- Returns:
- true if this node operates in a "server" mode processing remote Work requests; false otherwise
-
disableServer
public void disableServer(Member member)
Remove a Member from a list of active server nodes and terminate all work posted to that server. This method could be called by the client code in response to exceptions (i.e. serialization problems) thrown by a server that deem further use of that server impossible.- Parameters:
member
- the Member to remove from the server list
-
cancelPostedWork
public void cancelPostedWork()
Cancel all posted work that has not yet completed.
-
chooseServer
protected Member chooseServer()
Choose a Member to run a next Work at.Called by client threads.
- Returns:
- the Member to run a next Work at
-
getPostedWork
protected WorkManager.WorkHolder getPostedWork(long lWorkId)
Return a WorkHolder for a posted work with a given id.- Parameters:
lWorkId
- a work id- Returns:
- a WorkHolder for a given work id
-
removePostedWork
protected WorkManager.WorkHolder removePostedWork(long lWorkId)
Remove a WorkHolder for a posted work with a given id.- Parameters:
lWorkId
- a work id- Returns:
- a WorkHolder for a given work id
-
generateWorkId
protected int generateWorkId()
Generate a unique (for this WorkManager) work id.Called by client threads.
- Returns:
- the generated Work id
-
processFeedback
protected void processFeedback(WorkManager.SendFeedback workFeedback)
Process a remote work feedback.Called by the InvocationService on the service thread at the client VM.
- Parameters:
workFeedback
- the feedback Work object
-
updateWork
protected boolean updateWork(WorkManager.WorkStatus status)
Update a WorkStatus for a work. If the corresponding WorkHolder has already been released, no action takes place.Called by the InvocationService on a service thread of the client VM.
- Parameters:
status
- a WorkStatus object with an new status- Returns:
- true iff the status was updated; false if the corresponding work has already been released
-
initWork
protected void initWork(WorkManager.ScheduleWork workAgent)
Initialize the remote work agent.Called by the InvocationService on the service thread of the server member.
- Parameters:
workAgent
- the agent Work object
-
processWork
protected WorkManager.WorkStatus processWork(WorkManager.ScheduleWork workAgent)
Run the remote work agent.Called by a daemon thread of the InvocationService on a server VM.
- Parameters:
workAgent
- the agent Work object- Returns:
- the WorkStatus object
-
requestWorkStatus
protected WorkManager.WorkStatus requestWorkStatus(WorkManager.RequestStatus requestAgent)
Obtain current status of the specified Work.Called by a daemon thread of the InvocationService on a server VM.
- Parameters:
requestAgent
- the request status- Returns:
- the corresponding WorkStatus object
-
releaseWork
protected void releaseWork(WorkManager.ReleaseWork releaseAgent)
Release the remote work agent.Called by a daemon thread of the InvocationService on a server VM.
- Parameters:
releaseAgent
- the release work
-
main
public static void main(String[] asArg)
Entry point: start a stand-alone WorkManager (server) using the specified name and thread count.Example: java com.tangosol.coherence.commonj.WorkManager Manager 5
- Parameters:
asArg
- the command line arguments
-
-