iotcs.client.device.util module¶
-
class
iotcs.client.device.util.
MessageDispatcher
¶ Bases:
abc.ABC
The MessageDispatcher queues messages for automatic dispatching.
Messages are dispatched to the Oracle IoT Cloud Service. The MessageDispatcher prioritizes message dispatching to ensure high priority messages are dispatched ahead of lower priority messages. There is one MessageDispatcher instance per client.
-
class
DeliveryCallback
¶ Bases:
abc.ABC
A callback interface for successful delivery of messages.
-
delivered
(messages)¶ Notify that messages have been successfully dispatched. This callback indicates that the messages have been delivered to, and accepted by, the server and are no longer queued. :param messages: the delivered messages.
-
-
class
ErrorCallback
¶ Bases:
abc.ABC
A callback interface called if errors occur during the delivery of messages.
-
failed
(messages, exception)¶ Notify that an error occurred when attempting to deliver messages to the server. This callback indicates that the messages have not been delivered to the server and are no longer queued. :param messages: the messages that were not delivered :param exception: the exception that was raised when attempting
to deliver the messages.
-
-
MESSAGE_DISPATCHER_URN
= 'urn:oracle:iot:dcd:capability:message_dispatcher'¶
-
close
()¶
-
classmethod
getMessageDispatcher
(dcd)¶ Return the instance of a
MessageDispatcher
for the dcd.This is an implementation detail and is called from the implementation of the virtualization API.
Parameters: dcd – is a DirectlyConnectedDevice
orGateway
connected device :return: AMessageDispatcherImpl
for dcd.
-
getRequestDispatcher
()¶ Return the
<.RequestDispatcher>
used by instance.Returns: a <.RequestDispatcher>
-
offer
(messages)¶ Offer messages to be queued.
Depending on the policies, if any, the messages will be queued if it is possible to do so without violating capacity restrictions.
Parameters: messages – a list of messages to be queued Raises: ArgumentException
if messages is None or empty
-
queue
(messages)¶ Add the messages to the outgoing message queue if it is possible to do so without violating capacity restrictions. :param messages: queue a single or list of iotcs.shared.Message sublclass instances :raises:
StateException
if all the messages cannot be added to the queueor if messages is None or empty
-
classmethod
removeMessageDispatcher
(dcd)¶ Remove the instance of
MessageDispatcher
for dcd.After calling removeMessageDispatcher, a call to
getMessageDispatcher()
will return None. This is an implementation detail and is called from the implementation of the virtualization API.Parameters: dcd – is a DirectlyConnectedDevice
orGateway
connected device :return: theMessageDispatcher
for dcd or None
-
classmethod
setMessageDispatcher
(dcd, messageDispatcher)¶ Set the instance of a
MessageDispatcher
for the dcd.This is an implementation detail and is called from the implementation of the virtualization API.
Parameters: dcd – is a DirectlyConnectedDevice
orGateway
connected device :param messageDispatcher: is theMessageDispatcher
for dcd.
-
setOnDelivery
(callback)¶ Set a callback to be notified if message is successfully delivered. :param callback: An instance of iotcs.device.MessageDispatcher.DeliveryCallback.
If callback is None the existing callback will be removed.
-
setOnError
(callback)¶ Set a callback to be notified if there is an error in sending messages. :param callback: An instance of iotcs.device.MessageDispatcher.ErrorCallback.
If callback is None the existing callback will be removed.
-
class
-
class
iotcs.client.device.util.
MessageDispatcherImpl
(dcd)¶ Bases:
iotcs.client.device.util.MessageDispatcher
The MessageDispatcher queues messages for automatic dispatching to the for the Oracle IoT Cloud Service. The MessageDispatcher prioritizes message dispatching to ensure high priority messages are dispatched ahead of lower priority messages. There is one MessageDispatcher instance per client.
-
BASIC_NUMBER_OF_RETRIES_PROPERTY
= 'dispatcher_basic_number_of_retries'¶
-
class
Counters
(md)¶ Bases:
object
Class to maintain counters
-
counters
¶
-
countersLock
¶
-
md
¶
-
reset
()¶
-
toJson
()¶
-
update
(counter, increment)¶
-
-
class
CountersHandler
(counters)¶ Bases:
iotcs.messaging.client.device.util.RequestHandler
-
COUNTERS_URL
= 'deviceModels/urn:oracle:iot:dcd:capability:message_dispatcher/counters'¶
-
counters
¶
-
handleRequest
(request)¶ Handle the
iotcs.shared.message.RequestMessage
.After handling the reqest from the IoT server, return a
iotcs.shared.message.ResponseMessage
:param requestMessage: A device request. :return:iotcs.shared.message.ResponseMessage
-
-
DEFAULT_SETTLE_TIME
= 10000¶
-
DEVICE_MESSAGING_SECTION
= 'device_messaging'¶
-
DISABLE_LONG_POLLING_PROPERTY
= 'disable_long_polling'¶
-
DISPATCHER_THREAD_POOL_SIZE_PROPERTY
= 'request_dispatcher_thread_pool_size'¶
-
class
Dispatcher
(md, requestMessage, settled=True)¶ Bases:
threading.Thread
-
md
¶
-
requestMessage
¶
-
run
()¶ Method representing the thread’s activity.
You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.
-
settled
¶
-
-
class
ErrorDispatcher
(callback, messageList, exception)¶ Bases:
threading.Thread
Dispatch send errors in a thread.
Parameters: - callback – An instance of iotcs.device.MessageDispatcher.ErrorCallback
- messageList – The list of messages that could not be delivered
- exception – The Exception that occured dispatching the messages
-
errorCallback
¶
-
exception
¶
-
messageList
¶
-
run
()¶ Method representing the thread’s activity.
You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.
-
JOINTIMEOUT
= 5¶
-
MAXIMUM_MESSAGES_PER_CONNECTION_PROPERTY
= 'dispatcher_max_messages_per_connection'¶
-
MAXIMUM_MESSAGES_TO_QUEUE_PROPERTY
= 'dispatcher_max_queue_size'¶
-
MESSAGE_DISPATCHER_BACKOFF_PROPERTY
= 'message_dispatcher_backoff'¶
-
MESSAGE_QUEUE_WAITTIME_PROPERTY
= 'message_queue_waittime'¶
-
POLLING_INTERVAL_PROPERTY
= 'dispatcher_polling_interval'¶
-
class
PendingRequestProcessor
(md, settleTime)¶ Bases:
threading.Thread
-
MAX_WAIT_TIME
= 1000¶
-
averageWaitTime
¶
-
md
¶
-
run
()¶ Method representing the thread’s activity.
You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.
-
running
¶
-
settleTime
¶
-
settled
¶
-
timeZero
¶
-
-
class
PollingIntervalHandler
(md)¶ Bases:
iotcs.messaging.client.device.util.RequestHandler
-
POLLING_INTERVAL_URL
= 'deviceModels/urn:oracle:iot:dcd:capability:message_dispatcher/pollingInterval'¶
-
handleRequest
(request)¶ Handle the
iotcs.shared.message.RequestMessage
.After handling the reqest from the IoT server, return a
iotcs.shared.message.ResponseMessage
:param requestMessage: A device request. :return:iotcs.shared.message.ResponseMessage
-
md
¶
-
-
class
Receiver
(md)¶ Bases:
threading.Thread
-
counters
¶
-
md
¶
-
pendingRequestProcessor
¶
-
run
()¶ Method representing the thread’s activity.
You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.
-
running
¶
-
settleTime
¶
-
settled
¶
-
timeZero
¶
-
-
class
ResetHandler
(counters)¶ Bases:
iotcs.messaging.client.device.util.RequestHandler
-
RESET_URL
= 'deviceModels/urn:oracle:iot:dcd:capability:message_dispatcher/reset'¶
-
counters
¶
-
handleRequest
(request)¶ Handle the
iotcs.shared.message.RequestMessage
.After handling the reqest from the IoT server, return a
iotcs.shared.message.ResponseMessage
:param requestMessage: A device request. :return:iotcs.shared.message.ResponseMessage
-
-
SETTLE_TIME_PROPERTY
= 'dispatcher_settle_time'¶
-
class
Transmitter
(md)¶ Bases:
threading.Thread
-
RFC1123_FORMAT
¶
-
attempt
¶
-
backoff
¶
-
calculateBackoff
(retryafter)¶ Calculate the new value of ‘backoff’.
If backoff > 0, then we are already backing off and no adjustment is made. Only adjust backoff if backoff less than or equal to zero. A backoff value of less than or equal to zero means we are either not currently backing off, or the backoff period has expired. This method should only be called from the ‘send’ method when handling an IOException. :return:
-
counters
¶
-
delayinmillis
¶
-
fib
¶
-
getMessagesToSend
(pendingMessages, receievedNewAlert)¶ Get messages to send from the pendingMessages list.
After this method returns, there may still be messages in the pendingMessages list. Left over messages are messages that could not be sent because of an in-process storage object upload, or messages that could not be sent because of we’re backing off (due to ‘HTTP 503: server busy’).
‘receivedNewAlert’ is a flag to indicate that at a new alert message was queued. If we are backing off, and there aren’t any new alert messages, then this method returns an empty list.
Parameters: - pendingMessages –
- newAlert –
Returns:
-
md
¶
-
messagesSent
(messages)¶ These messages were successfully delivered to the server.
Update state and notify callbacks
Parameters: messages – Returns:
-
run
()¶ Method representing the thread’s activity.
You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.
-
running
¶
-
send
(pendingMessages, newAlert)¶ Parameters: - pendingMessages –
- newAlert –
Returns:
-
updateDiagnosticsData
(messages)¶ Parameters: messages – list of Message Returns:
-
-
addStorageObjectDependency
(storageObject, clientMsgId)¶
-
backoffDelay
¶
-
basicNumberOfRetries
¶
-
close
()¶
-
closeLock
¶
-
closed
¶
-
contentLock
¶
-
contentMap
¶
-
counters
¶
-
countersHandler
¶
-
deliveryCallback
¶
-
deliveryCallbackLock
¶
-
deviceClient
¶
-
diagnosticsHandler
¶
-
dispatchDeliveryCallback
(messages)¶
-
dispatchError
(messageList, exception)¶ Dispatch a messaging error to
MessageDispatcher.ErrorCallback
. # noqa: D400,E501Parameters: - messageList – list of messages with errors
- exception – an Exception that caused the error.
-
endpointId
¶
-
errorCallback
¶
-
errorCallbackLock
¶
-
failedContentIds
¶
-
getLogger
(fromclass=None)¶ Return the iotcs.device.impl.MessageDispatcherImpl logger.
Parameters: fromclass – add fromclass to the logger name.
-
getMaximumMessagesPerConnection
()¶
-
getPollingInterval
()¶
-
getQueueCapacity
()¶
-
getQueueSize
()¶
-
getRequestDispatcher
()¶ Return the
<.RequestDispatcher>
used by instance.Returns: a <.RequestDispatcher>
-
getWaitOnReconnect
()¶
-
incrementQueueCapacity
(increment)¶
-
isClosed
()¶ Return True if message dispatcher has been closed.
-
isContentDependent
(clientId)¶
-
isRequestClose
()¶
-
maximumMessagesPerConnection
¶
-
maximumQueueSize
¶
-
messageQueueWaittime
¶
-
messageQueued
¶
-
messageSent
¶
-
offer
(messages)¶ Offer messages to be queued.
Depending on the policies, if any, the messages will be queued if it is possible to do so without violating capacity restrictions.
Parameters: messages – a list of messages to be queued Raises: ArgumentException
if messages is None or empty
-
outgoingMessageQueue
¶
-
pendingQueueLock
¶
-
pendingRequestMessages
¶
-
pendingTrigger
¶
-
pollingInterval
¶
-
pollingIntervalHandler
¶
-
pollingIntervalLock
¶
-
queue
(messages)¶ Add the messages to the outgoing message queue if it is possible to do so without violating capacity restrictions. :param messages: queue a single or list of iotcs.shared.Message sublclass instances :raises:
StateException
if all the messages cannot be added to the queueor if messages is None or empty
-
queueCapacity
¶
-
queueCapacityLock
¶
-
receiveLock
¶
-
receiveThread
¶
-
removeStorageObjectDependency
(storageObject)¶
-
requestClose
¶
-
requestDispatcher
¶
-
resetHandler
¶
-
sendLock
¶
-
setOnDelivery
(onDelivery)¶ Set a callback to be notified if message is successfully delivered. :param callback: An instance of iotcs.device.MessageDispatcher.DeliveryCallback.
If callback is None the existing callback will be removed.
-
setOnError
(onError)¶ Set a callback to be notified if there is an error in sending messages. :param callback: An instance of iotcs.device.MessageDispatcher.ErrorCallback.
If callback is None the existing callback will be removed.
-
setPollingInterval
(value)¶
-
setWaitOnReconnect
(value)¶
-
setWaitOnReconnectAndNotify
(value)¶
-
testConnectivityHandler
¶
-
transmitThread
¶
-
useLongPolling
¶
-
waitOnReconnect
¶
-
waitOnReconnectLock
¶
-