iotcs.client.device.util module

class iotcs.client.device.util.MessageDispatcher

Bases: abc.ABC

The MessageDispatcher queues messages for automatic dispatching.

Messages are dispatched to the Oracle IoT Cloud Service. The MessageDispatcher prioritizes message dispatching to ensure high priority messages are dispatched ahead of lower priority messages. There is one MessageDispatcher instance per client.

class DeliveryCallback

Bases: abc.ABC

A callback interface for successful delivery of messages.

delivered(messages)

Notify that messages have been successfully dispatched. This callback indicates that the messages have been delivered to, and accepted by, the server and are no longer queued. :param messages: the delivered messages.

class ErrorCallback

Bases: abc.ABC

A callback interface called if errors occur during the delivery of messages.

failed(messages, exception)

Notify that an error occurred when attempting to deliver messages to the server. This callback indicates that the messages have not been delivered to the server and are no longer queued. :param messages: the messages that were not delivered :param exception: the exception that was raised when attempting

to deliver the messages.
MESSAGE_DISPATCHER_URN = 'urn:oracle:iot:dcd:capability:message_dispatcher'
close()
classmethod getMessageDispatcher(dcd)

Return the instance of a MessageDispatcher for the dcd.

This is an implementation detail and is called from the implementation of the virtualization API.

Parameters:dcd – is a DirectlyConnectedDevice or

Gateway connected device :return: A MessageDispatcherImpl for dcd.

getRequestDispatcher()

Return the <.RequestDispatcher> used by instance.

Returns:a <.RequestDispatcher>
offer(messages)

Offer messages to be queued.

Depending on the policies, if any, the messages will be queued if it is possible to do so without violating capacity restrictions.

Parameters:messages – a list of messages to be queued
Raises:ArgumentException if messages is None or empty
queue(messages)

Add the messages to the outgoing message queue if it is possible to do so without violating capacity restrictions. :param messages: queue a single or list of iotcs.shared.Message sublclass instances :raises: StateException if all the messages cannot be added to the queue

or if messages is None or empty
classmethod removeMessageDispatcher(dcd)

Remove the instance of MessageDispatcher for dcd.

After calling removeMessageDispatcher, a call to getMessageDispatcher() will return None. This is an implementation detail and is called from the implementation of the virtualization API.

Parameters:dcd – is a DirectlyConnectedDevice or

Gateway connected device :return: the MessageDispatcher for dcd or None

classmethod setMessageDispatcher(dcd, messageDispatcher)

Set the instance of a MessageDispatcher for the dcd.

This is an implementation detail and is called from the implementation of the virtualization API.

Parameters:dcd – is a DirectlyConnectedDevice or

Gateway connected device :param messageDispatcher: is the MessageDispatcher for dcd.

setOnDelivery(callback)

Set a callback to be notified if message is successfully delivered. :param callback: An instance of iotcs.device.MessageDispatcher.DeliveryCallback.

If callback is None the existing callback will be removed.
setOnError(callback)

Set a callback to be notified if there is an error in sending messages. :param callback: An instance of iotcs.device.MessageDispatcher.ErrorCallback.

If callback is None the existing callback will be removed.
class iotcs.client.device.util.MessageDispatcherImpl(dcd)

Bases: iotcs.client.device.util.MessageDispatcher

The MessageDispatcher queues messages for automatic dispatching to the for the Oracle IoT Cloud Service. The MessageDispatcher prioritizes message dispatching to ensure high priority messages are dispatched ahead of lower priority messages. There is one MessageDispatcher instance per client.

BASIC_NUMBER_OF_RETRIES_PROPERTY = 'dispatcher_basic_number_of_retries'
class Counters(md)

Bases: object

Class to maintain counters

counters
countersLock
md
reset()
toJson()
update(counter, increment)
class CountersHandler(counters)

Bases: iotcs.messaging.client.device.util.RequestHandler

COUNTERS_URL = 'deviceModels/urn:oracle:iot:dcd:capability:message_dispatcher/counters'
counters
handleRequest(request)

Handle the iotcs.shared.message.RequestMessage.

After handling the reqest from the IoT server, return a iotcs.shared.message.ResponseMessage :param requestMessage: A device request. :return: iotcs.shared.message.ResponseMessage

DEFAULT_SETTLE_TIME = 10000
DEVICE_MESSAGING_SECTION = 'device_messaging'
DISABLE_LONG_POLLING_PROPERTY = 'disable_long_polling'
DISPATCHER_THREAD_POOL_SIZE_PROPERTY = 'request_dispatcher_thread_pool_size'
class Dispatcher(md, requestMessage, settled=True)

Bases: threading.Thread

md
requestMessage
run()

Method representing the thread’s activity.

You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.

settled
class ErrorDispatcher(callback, messageList, exception)

Bases: threading.Thread

Dispatch send errors in a thread.

Parameters:
  • callback – An instance of iotcs.device.MessageDispatcher.ErrorCallback
  • messageList – The list of messages that could not be delivered
  • exception – The Exception that occured dispatching the messages
errorCallback
exception
messageList
run()

Method representing the thread’s activity.

You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.

JOINTIMEOUT = 5
MAXIMUM_MESSAGES_PER_CONNECTION_PROPERTY = 'dispatcher_max_messages_per_connection'
MAXIMUM_MESSAGES_TO_QUEUE_PROPERTY = 'dispatcher_max_queue_size'
MESSAGE_DISPATCHER_BACKOFF_PROPERTY = 'message_dispatcher_backoff'
MESSAGE_QUEUE_WAITTIME_PROPERTY = 'message_queue_waittime'
POLLING_INTERVAL_PROPERTY = 'dispatcher_polling_interval'
class PendingRequestProcessor(md, settleTime)

Bases: threading.Thread

MAX_WAIT_TIME = 1000
averageWaitTime
md
run()

Method representing the thread’s activity.

You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.

running
settleTime
settled
timeZero
class PollingIntervalHandler(md)

Bases: iotcs.messaging.client.device.util.RequestHandler

POLLING_INTERVAL_URL = 'deviceModels/urn:oracle:iot:dcd:capability:message_dispatcher/pollingInterval'
handleRequest(request)

Handle the iotcs.shared.message.RequestMessage.

After handling the reqest from the IoT server, return a iotcs.shared.message.ResponseMessage :param requestMessage: A device request. :return: iotcs.shared.message.ResponseMessage

md
class Receiver(md)

Bases: threading.Thread

counters
md
pendingRequestProcessor
run()

Method representing the thread’s activity.

You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.

running
settleTime
settled
timeZero
class ResetHandler(counters)

Bases: iotcs.messaging.client.device.util.RequestHandler

RESET_URL = 'deviceModels/urn:oracle:iot:dcd:capability:message_dispatcher/reset'
counters
handleRequest(request)

Handle the iotcs.shared.message.RequestMessage.

After handling the reqest from the IoT server, return a iotcs.shared.message.ResponseMessage :param requestMessage: A device request. :return: iotcs.shared.message.ResponseMessage

SETTLE_TIME_PROPERTY = 'dispatcher_settle_time'
class Transmitter(md)

Bases: threading.Thread

RFC1123_FORMAT
attempt
backoff
calculateBackoff(retryafter)

Calculate the new value of ‘backoff’.

If backoff > 0, then we are already backing off and no adjustment is made. Only adjust backoff if backoff less than or equal to zero. A backoff value of less than or equal to zero means we are either not currently backing off, or the backoff period has expired. This method should only be called from the ‘send’ method when handling an IOException. :return:

counters
delayinmillis
fib
getMessagesToSend(pendingMessages, receievedNewAlert)

Get messages to send from the pendingMessages list.

After this method returns, there may still be messages in the pendingMessages list. Left over messages are messages that could not be sent because of an in-process storage object upload, or messages that could not be sent because of we’re backing off (due to ‘HTTP 503: server busy’).

‘receivedNewAlert’ is a flag to indicate that at a new alert message was queued. If we are backing off, and there aren’t any new alert messages, then this method returns an empty list.

Parameters:
  • pendingMessages
  • newAlert
Returns:

md
messagesSent(messages)

These messages were successfully delivered to the server.

Update state and notify callbacks

Parameters:messages
Returns:
run()

Method representing the thread’s activity.

You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.

running
send(pendingMessages, newAlert)
Parameters:
  • pendingMessages
  • newAlert
Returns:

updateDiagnosticsData(messages)
Parameters:messages – list of Message
Returns:
addStorageObjectDependency(storageObject, clientMsgId)
backoffDelay
basicNumberOfRetries
close()
closeLock
closed
contentLock
contentMap
counters
countersHandler
deliveryCallback
deliveryCallbackLock
deviceClient
diagnosticsHandler
dispatchDeliveryCallback(messages)
dispatchError(messageList, exception)

Dispatch a messaging error to MessageDispatcher.ErrorCallback. # noqa: D400,E501

Parameters:
  • messageList – list of messages with errors
  • exception – an Exception that caused the error.
endpointId
errorCallback
errorCallbackLock
failedContentIds
getLogger(fromclass=None)

Return the iotcs.device.impl.MessageDispatcherImpl logger.

Parameters:fromclass – add fromclass to the logger name.
getMaximumMessagesPerConnection()
getPollingInterval()
getQueueCapacity()
getQueueSize()
getRequestDispatcher()

Return the <.RequestDispatcher> used by instance.

Returns:a <.RequestDispatcher>
getWaitOnReconnect()
incrementQueueCapacity(increment)
isClosed()

Return True if message dispatcher has been closed.

isContentDependent(clientId)
isRequestClose()
maximumMessagesPerConnection
maximumQueueSize
messageQueueWaittime
messageQueued
messageSent
offer(messages)

Offer messages to be queued.

Depending on the policies, if any, the messages will be queued if it is possible to do so without violating capacity restrictions.

Parameters:messages – a list of messages to be queued
Raises:ArgumentException if messages is None or empty
outgoingMessageQueue
pendingQueueLock
pendingRequestMessages
pendingTrigger
pollingInterval
pollingIntervalHandler
pollingIntervalLock
queue(messages)

Add the messages to the outgoing message queue if it is possible to do so without violating capacity restrictions. :param messages: queue a single or list of iotcs.shared.Message sublclass instances :raises: StateException if all the messages cannot be added to the queue

or if messages is None or empty
queueCapacity
queueCapacityLock
receiveLock
receiveThread
removeStorageObjectDependency(storageObject)
requestClose
requestDispatcher
resetHandler
sendLock
setOnDelivery(onDelivery)

Set a callback to be notified if message is successfully delivered. :param callback: An instance of iotcs.device.MessageDispatcher.DeliveryCallback.

If callback is None the existing callback will be removed.
setOnError(onError)

Set a callback to be notified if there is an error in sending messages. :param callback: An instance of iotcs.device.MessageDispatcher.ErrorCallback.

If callback is None the existing callback will be removed.
setPollingInterval(value)
setWaitOnReconnect(value)
setWaitOnReconnectAndNotify(value)
testConnectivityHandler
transmitThread
useLongPolling
waitOnReconnect
waitOnReconnectLock