Class: OCI::Streaming::StreamClient

Inherits:
Object
  • Object
show all
Defined in:
lib/oci/streaming/stream_client.rb

Overview

The API for the Streaming Service.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config: nil, endpoint: nil, signer: nil, proxy_settings: nil, retry_config: nil) ⇒ StreamClient

Creates a new StreamClient. Notes: If a config is not specified, then the global OCI.config will be used.

This client is not thread-safe

Parameters:

  • config (Config) (defaults to: nil)

    A Config object.

  • endpoint (String) (defaults to: nil)

    The fully qualified endpoint URL

  • signer (OCI::BaseSigner) (defaults to: nil)

    A signer implementation which can be used by this client. If this is not provided then a signer will be constructed via the provided config. One use case of this parameter is instance principals authentication, so that the instance principals signer can be provided to the client

  • proxy_settings (OCI::ApiClientProxySettings) (defaults to: nil)

    If your environment requires you to use a proxy server for outgoing HTTP requests the details for the proxy can be provided in this parameter

  • retry_config (OCI::Retry::RetryConfig) (defaults to: nil)

    The retry configuration for this service client. This represents the default retry configuration to apply across all operations. This can be overridden on a per-operation basis. The default retry configuration value is nil, which means that an operation will not perform any retries



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/oci/streaming/stream_client.rb', line 43

def initialize(config: nil, endpoint: nil, signer: nil, proxy_settings: nil, retry_config: nil)
  raise 'A fully qualified endpoint URL must be defined' unless endpoint

  @endpoint = endpoint + '/20180418'

  # If the signer is an InstancePrincipalsSecurityTokenSigner or SecurityTokenSigner and no config was supplied (they are self-sufficient signers)
  # then create a dummy config to pass to the ApiClient constructor. If customers wish to create a client which uses instance principals
  # and has config (either populated programmatically or loaded from a file), they must construct that config themselves and then
  # pass it to this constructor.
  #
  # If there is no signer (or the signer is not an instance principals signer) and no config was supplied, this is not valid
  # so try and load the config from the default file.
  config = OCI::Config.validate_and_build_config_with_signer(config, signer)

  if signer.nil?
    signer = OCI::Signer.new(
      config.user,
      config.fingerprint,
      config.tenancy,
      config.key_file,
      pass_phrase: config.pass_phrase,
      private_key_content: config.key_content
    )
  end

  @api_client = OCI::ApiClient.new(config, signer, proxy_settings: proxy_settings)
  @retry_config = retry_config
  logger.info "StreamClient endpoint set to '#{@endpoint}'." if logger
end

Instance Attribute Details

#api_clientOCI::ApiClient (readonly)

Client used to make HTTP requests.

Returns:



13
14
15
# File 'lib/oci/streaming/stream_client.rb', line 13

def api_client
  @api_client
end

#endpointString (readonly)

Fully qualified endpoint URL

Returns:

  • (String)


17
18
19
# File 'lib/oci/streaming/stream_client.rb', line 17

def endpoint
  @endpoint
end

#retry_configOCI::Retry::RetryConfig (readonly)

The default retry configuration to apply to all operations in this service client. This can be overridden on a per-operation basis. The default retry configuration value is nil, which means that an operation will not perform any retries



23
24
25
# File 'lib/oci/streaming/stream_client.rb', line 23

def retry_config
  @retry_config
end

Instance Method Details

#consumer_commit(stream_id, cursor, opts = {}) ⇒ Response

Provides a mechanism to manually commit offsets, if not using commit-on-get consumer semantics. This commits offsets assicated with the provided cursor, extends the timeout on each of the affected partitions, and returns an updated cursor.

Parameters:

  • stream_id (String)

    The OCID of the stream.

  • cursor (String)

    The group-cursor representing the offsets of the group. This cursor is retrieved from the CreateGroupCursor API call.

  • opts (Hash) (defaults to: {})

    the optional parameters

Options Hash (opts):

  • :retry_config (OCI::Retry::RetryConfig)

    The retry configuration to apply to this operation. If no key is provided then the service-level retry configuration defined by #retry_config will be used. If an explicit nil value is provided then the operation will not retry

  • :opc_request_id (String)

    The unique Oracle-assigned identifier for the request. If you need to contact Oracle about a particular request, please provide the request ID.

Returns:



98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
# File 'lib/oci/streaming/stream_client.rb', line 98

def consumer_commit(stream_id, cursor, opts = {})
  logger.debug 'Calling operation StreamClient#consumer_commit.' if logger

  raise "Missing the required parameter 'stream_id' when calling consumer_commit." if stream_id.nil?
  raise "Missing the required parameter 'cursor' when calling consumer_commit." if cursor.nil?
  raise "Parameter value for 'stream_id' must not be blank" if OCI::Internal::Util.blank_string?(stream_id)

  path = '/streams/{streamId}/commit'.sub('{streamId}', stream_id.to_s)
  operation_signing_strategy = :standard

  # rubocop:disable Style/NegatedIf
  # Query Params
  query_params = {}
  query_params[:cursor] = cursor

  # Header Params
  header_params = {}
  header_params[:accept] = 'application/json'
  header_params[:'content-type'] = 'application/json'
  header_params[:'opc-request-id'] = opts[:opc_request_id] if opts[:opc_request_id]
  # rubocop:enable Style/NegatedIf

  post_body = nil

  # rubocop:disable Metrics/BlockLength
  OCI::Retry.make_retrying_call(applicable_retry_config(opts), call_name: 'StreamClient#consumer_commit') do
    @api_client.call_api(
      :POST,
      path,
      endpoint,
      header_params: header_params,
      query_params: query_params,
      operation_signing_strategy: operation_signing_strategy,
      body: post_body,
      return_type: 'OCI::Streaming::Models::Cursor'
    )
  end
  # rubocop:enable Metrics/BlockLength
end

#consumer_heartbeat(stream_id, cursor, opts = {}) ⇒ Response

Allows long-running processes to extend the timeout on partitions reserved by a consumer instance.

Parameters:

  • stream_id (String)

    The OCID of the stream.

  • cursor (String)

    The group-cursor representing the offsets of the group. This cursor is retrieved from the CreateGroupCursor API call.

  • opts (Hash) (defaults to: {})

    the optional parameters

Options Hash (opts):

  • :retry_config (OCI::Retry::RetryConfig)

    The retry configuration to apply to this operation. If no key is provided then the service-level retry configuration defined by #retry_config will be used. If an explicit nil value is provided then the operation will not retry

  • :opc_request_id (String)

    The unique Oracle-assigned identifier for the request. If you need to contact Oracle about a particular request, please provide the request ID.

Returns:



159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
# File 'lib/oci/streaming/stream_client.rb', line 159

def consumer_heartbeat(stream_id, cursor, opts = {})
  logger.debug 'Calling operation StreamClient#consumer_heartbeat.' if logger

  raise "Missing the required parameter 'stream_id' when calling consumer_heartbeat." if stream_id.nil?
  raise "Missing the required parameter 'cursor' when calling consumer_heartbeat." if cursor.nil?
  raise "Parameter value for 'stream_id' must not be blank" if OCI::Internal::Util.blank_string?(stream_id)

  path = '/streams/{streamId}/heartbeat'.sub('{streamId}', stream_id.to_s)
  operation_signing_strategy = :standard

  # rubocop:disable Style/NegatedIf
  # Query Params
  query_params = {}
  query_params[:cursor] = cursor

  # Header Params
  header_params = {}
  header_params[:accept] = 'application/json'
  header_params[:'content-type'] = 'application/json'
  header_params[:'opc-request-id'] = opts[:opc_request_id] if opts[:opc_request_id]
  # rubocop:enable Style/NegatedIf

  post_body = nil

  # rubocop:disable Metrics/BlockLength
  OCI::Retry.make_retrying_call(applicable_retry_config(opts), call_name: 'StreamClient#consumer_heartbeat') do
    @api_client.call_api(
      :POST,
      path,
      endpoint,
      header_params: header_params,
      query_params: query_params,
      operation_signing_strategy: operation_signing_strategy,
      body: post_body,
      return_type: 'OCI::Streaming::Models::Cursor'
    )
  end
  # rubocop:enable Metrics/BlockLength
end

#create_cursor(stream_id, create_cursor_details, opts = {}) ⇒ Response

Creates a cursor. Cursors are used to consume a stream, starting from a specific point in the partition and going forward from there. You can create a cursor based on an offset, a time, the trim horizon, or the most recent message in the stream. As the oldest message inside the retention period boundary, using the trim horizon effectively lets you consume all messages in the stream. A cursor based on the most recent message allows consumption of only messages that are added to the stream after you create the cursor. Cursors expire five minutes after you receive them from the service.

Parameters:

  • stream_id (String)

    The OCID of the stream.

  • create_cursor_details (OCI::Streaming::Models::CreateCursorDetails)

    The information used to create the cursor.

  • opts (Hash) (defaults to: {})

    the optional parameters

Options Hash (opts):

  • :retry_config (OCI::Retry::RetryConfig)

    The retry configuration to apply to this operation. If no key is provided then the service-level retry configuration defined by #retry_config will be used. If an explicit nil value is provided then the operation will not retry

  • :opc_request_id (String)

    The unique Oracle-assigned identifier for the request. If you need to contact Oracle about a particular request, please provide the request ID.

Returns:



223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
# File 'lib/oci/streaming/stream_client.rb', line 223

def create_cursor(stream_id, create_cursor_details, opts = {})
  logger.debug 'Calling operation StreamClient#create_cursor.' if logger

  raise "Missing the required parameter 'stream_id' when calling create_cursor." if stream_id.nil?
  raise "Missing the required parameter 'create_cursor_details' when calling create_cursor." if create_cursor_details.nil?
  raise "Parameter value for 'stream_id' must not be blank" if OCI::Internal::Util.blank_string?(stream_id)

  path = '/streams/{streamId}/cursors'.sub('{streamId}', stream_id.to_s)
  operation_signing_strategy = :standard

  # rubocop:disable Style/NegatedIf
  # Query Params
  query_params = {}

  # Header Params
  header_params = {}
  header_params[:accept] = 'application/json'
  header_params[:'content-type'] = 'application/json'
  header_params[:'opc-request-id'] = opts[:opc_request_id] if opts[:opc_request_id]
  # rubocop:enable Style/NegatedIf

  post_body = @api_client.object_to_http_body(create_cursor_details)

  # rubocop:disable Metrics/BlockLength
  OCI::Retry.make_retrying_call(applicable_retry_config(opts), call_name: 'StreamClient#create_cursor') do
    @api_client.call_api(
      :POST,
      path,
      endpoint,
      header_params: header_params,
      query_params: query_params,
      operation_signing_strategy: operation_signing_strategy,
      body: post_body,
      return_type: 'OCI::Streaming::Models::Cursor'
    )
  end
  # rubocop:enable Metrics/BlockLength
end

#create_group_cursor(stream_id, create_group_cursor_details, opts = {}) ⇒ Response

Creates a group-cursor.

Parameters:

Options Hash (opts):

  • :retry_config (OCI::Retry::RetryConfig)

    The retry configuration to apply to this operation. If no key is provided then the service-level retry configuration defined by #retry_config will be used. If an explicit nil value is provided then the operation will not retry

  • :opc_request_id (String)

    The unique Oracle-assigned identifier for the request. If you need to contact Oracle about a particular request, please provide the request ID.

Returns:



282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
# File 'lib/oci/streaming/stream_client.rb', line 282

def create_group_cursor(stream_id, create_group_cursor_details, opts = {})
  logger.debug 'Calling operation StreamClient#create_group_cursor.' if logger

  raise "Missing the required parameter 'stream_id' when calling create_group_cursor." if stream_id.nil?
  raise "Missing the required parameter 'create_group_cursor_details' when calling create_group_cursor." if create_group_cursor_details.nil?
  raise "Parameter value for 'stream_id' must not be blank" if OCI::Internal::Util.blank_string?(stream_id)

  path = '/streams/{streamId}/groupCursors'.sub('{streamId}', stream_id.to_s)
  operation_signing_strategy = :standard

  # rubocop:disable Style/NegatedIf
  # Query Params
  query_params = {}

  # Header Params
  header_params = {}
  header_params[:accept] = 'application/json'
  header_params[:'content-type'] = 'application/json'
  header_params[:'opc-request-id'] = opts[:opc_request_id] if opts[:opc_request_id]
  # rubocop:enable Style/NegatedIf

  post_body = @api_client.object_to_http_body(create_group_cursor_details)

  # rubocop:disable Metrics/BlockLength
  OCI::Retry.make_retrying_call(applicable_retry_config(opts), call_name: 'StreamClient#create_group_cursor') do
    @api_client.call_api(
      :POST,
      path,
      endpoint,
      header_params: header_params,
      query_params: query_params,
      operation_signing_strategy: operation_signing_strategy,
      body: post_body,
      return_type: 'OCI::Streaming::Models::Cursor'
    )
  end
  # rubocop:enable Metrics/BlockLength
end

#get_group(stream_id, group_name, opts = {}) ⇒ Response

Returns the current state of a consumer group.

Parameters:

  • stream_id (String)

    The OCID of the stream.

  • group_name (String)

    The name of the consumer group.

  • opts (Hash) (defaults to: {})

    the optional parameters

Options Hash (opts):

  • :retry_config (OCI::Retry::RetryConfig)

    The retry configuration to apply to this operation. If no key is provided then the service-level retry configuration defined by #retry_config will be used. If an explicit nil value is provided then the operation will not retry

  • :opc_request_id (String)

    The unique Oracle-assigned identifier for the request. If you need to contact Oracle about a particular request, please provide the request ID.

Returns:



341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
# File 'lib/oci/streaming/stream_client.rb', line 341

def get_group(stream_id, group_name, opts = {})
  logger.debug 'Calling operation StreamClient#get_group.' if logger

  raise "Missing the required parameter 'stream_id' when calling get_group." if stream_id.nil?
  raise "Missing the required parameter 'group_name' when calling get_group." if group_name.nil?
  raise "Parameter value for 'stream_id' must not be blank" if OCI::Internal::Util.blank_string?(stream_id)
  raise "Parameter value for 'group_name' must not be blank" if OCI::Internal::Util.blank_string?(group_name)

  path = '/streams/{streamId}/groups/{groupName}'.sub('{streamId}', stream_id.to_s).sub('{groupName}', group_name.to_s)
  operation_signing_strategy = :standard

  # rubocop:disable Style/NegatedIf
  # Query Params
  query_params = {}

  # Header Params
  header_params = {}
  header_params[:accept] = 'application/json'
  header_params[:'content-type'] = 'application/json'
  header_params[:'opc-request-id'] = opts[:opc_request_id] if opts[:opc_request_id]
  # rubocop:enable Style/NegatedIf

  post_body = nil

  # rubocop:disable Metrics/BlockLength
  OCI::Retry.make_retrying_call(applicable_retry_config(opts), call_name: 'StreamClient#get_group') do
    @api_client.call_api(
      :GET,
      path,
      endpoint,
      header_params: header_params,
      query_params: query_params,
      operation_signing_strategy: operation_signing_strategy,
      body: post_body,
      return_type: 'OCI::Streaming::Models::Group'
    )
  end
  # rubocop:enable Metrics/BlockLength
end

#get_messages(stream_id, cursor, opts = {}) ⇒ Response

Returns messages from the specified stream using the specified cursor as the starting point for consumption. By default, the number of messages returned is undefined, but the service returns as many as possible. To get messages, you must first obtain a cursor using the create_cursor operation. In the response, retrieve the value of the 'opc-next-cursor' header to pass as a parameter to get the next batch of messages in the stream.

Parameters:

  • stream_id (String)

    The OCID of the stream.

  • cursor (String)

    The cursor used to consume the stream.

  • opts (Hash) (defaults to: {})

    the optional parameters

Options Hash (opts):

  • :retry_config (OCI::Retry::RetryConfig)

    The retry configuration to apply to this operation. If no key is provided then the service-level retry configuration defined by #retry_config will be used. If an explicit nil value is provided then the operation will not retry

  • :limit (Integer)

    The maximum number of messages to return. You can specify any value up to 10000. By default, the service returns as many messages as possible. Consider your average message size to help avoid exceeding throughput on the stream.

  • :opc_request_id (String)

    The unique Oracle-assigned identifier for the request. If you need to contact Oracle about a particular request, please provide the request ID.

Returns:



407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
# File 'lib/oci/streaming/stream_client.rb', line 407

def get_messages(stream_id, cursor, opts = {})
  logger.debug 'Calling operation StreamClient#get_messages.' if logger

  raise "Missing the required parameter 'stream_id' when calling get_messages." if stream_id.nil?
  raise "Missing the required parameter 'cursor' when calling get_messages." if cursor.nil?
  raise "Parameter value for 'stream_id' must not be blank" if OCI::Internal::Util.blank_string?(stream_id)

  path = '/streams/{streamId}/messages'.sub('{streamId}', stream_id.to_s)
  operation_signing_strategy = :standard

  # rubocop:disable Style/NegatedIf
  # Query Params
  query_params = {}
  query_params[:cursor] = cursor
  query_params[:limit] = opts[:limit] if opts[:limit]

  # Header Params
  header_params = {}
  header_params[:accept] = 'application/json'
  header_params[:'content-type'] = 'application/json'
  header_params[:'opc-request-id'] = opts[:opc_request_id] if opts[:opc_request_id]
  # rubocop:enable Style/NegatedIf

  post_body = nil

  # rubocop:disable Metrics/BlockLength
  OCI::Retry.make_retrying_call(applicable_retry_config(opts), call_name: 'StreamClient#get_messages') do
    @api_client.call_api(
      :GET,
      path,
      endpoint,
      header_params: header_params,
      query_params: query_params,
      operation_signing_strategy: operation_signing_strategy,
      body: post_body,
      return_type: 'Array<OCI::Streaming::Models::Message>'
    )
  end
  # rubocop:enable Metrics/BlockLength
end

#loggerLogger

Returns The logger for this client. May be nil.

Returns:

  • (Logger)

    The logger for this client. May be nil.



75
76
77
# File 'lib/oci/streaming/stream_client.rb', line 75

def logger
  @api_client.config.logger
end

#put_messages(stream_id, put_messages_details, opts = {}) ⇒ Response

Emits messages to a stream. There's no limit to the number of messages in a request, but the total size of a message or request must be 1 MiB or less. The service calculates the partition ID from the message key and stores messages that share a key on the same partition. If a message does not contain a key or if the key is null, the service generates a message key for you. The partition ID cannot be passed as a parameter.

Parameters:

  • stream_id (String)

    The OCID of the stream.

  • put_messages_details (OCI::Streaming::Models::PutMessagesDetails)

    Array of messages to put into the stream.

  • opts (Hash) (defaults to: {})

    the optional parameters

Options Hash (opts):

  • :retry_config (OCI::Retry::RetryConfig)

    The retry configuration to apply to this operation. If no key is provided then the service-level retry configuration defined by #retry_config will be used. If an explicit nil value is provided then the operation will not retry

  • :opc_request_id (String)

    The unique Oracle-assigned identifier for the request. If you need to contact Oracle about a particular request, please provide the request ID.

Returns:



471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
# File 'lib/oci/streaming/stream_client.rb', line 471

def put_messages(stream_id, put_messages_details, opts = {})
  logger.debug 'Calling operation StreamClient#put_messages.' if logger

  raise "Missing the required parameter 'stream_id' when calling put_messages." if stream_id.nil?
  raise "Missing the required parameter 'put_messages_details' when calling put_messages." if put_messages_details.nil?
  raise "Parameter value for 'stream_id' must not be blank" if OCI::Internal::Util.blank_string?(stream_id)

  path = '/streams/{streamId}/messages'.sub('{streamId}', stream_id.to_s)
  operation_signing_strategy = :exclude_body

  # rubocop:disable Style/NegatedIf
  # Query Params
  query_params = {}

  # Header Params
  header_params = {}
  header_params[:accept] = 'application/json'
  header_params[:'content-type'] = 'application/json'
  header_params[:'opc-request-id'] = opts[:opc_request_id] if opts[:opc_request_id]
  # rubocop:enable Style/NegatedIf

  post_body = @api_client.object_to_http_body(put_messages_details)

  # rubocop:disable Metrics/BlockLength
  OCI::Retry.make_retrying_call(applicable_retry_config(opts), call_name: 'StreamClient#put_messages') do
    @api_client.call_api(
      :POST,
      path,
      endpoint,
      header_params: header_params,
      query_params: query_params,
      operation_signing_strategy: operation_signing_strategy,
      body: post_body,
      return_type: 'OCI::Streaming::Models::PutMessagesResult'
    )
  end
  # rubocop:enable Metrics/BlockLength
end

#update_group(stream_id, group_name, update_group_details, opts = {}) ⇒ Response

Forcefully changes the current location of a group as a whole; reseting processing location of all consumers to a particular location in the stream.

Parameters:

  • stream_id (String)

    The OCID of the stream.

  • group_name (String)

    The name of the consumer group.

  • update_group_details (OCI::Streaming::Models::UpdateGroupDetails)

    The information used to modify the group.

  • opts (Hash) (defaults to: {})

    the optional parameters

Options Hash (opts):

  • :retry_config (OCI::Retry::RetryConfig)

    The retry configuration to apply to this operation. If no key is provided then the service-level retry configuration defined by #retry_config will be used. If an explicit nil value is provided then the operation will not retry

  • :opc_request_id (String)

    The unique Oracle-assigned identifier for the request. If you need to contact Oracle about a particular request, please provide the request ID.

Returns:

  • (Response)

    A Response object with data of type nil



531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
# File 'lib/oci/streaming/stream_client.rb', line 531

def update_group(stream_id, group_name, update_group_details, opts = {})
  logger.debug 'Calling operation StreamClient#update_group.' if logger

  raise "Missing the required parameter 'stream_id' when calling update_group." if stream_id.nil?
  raise "Missing the required parameter 'group_name' when calling update_group." if group_name.nil?
  raise "Missing the required parameter 'update_group_details' when calling update_group." if update_group_details.nil?
  raise "Parameter value for 'stream_id' must not be blank" if OCI::Internal::Util.blank_string?(stream_id)
  raise "Parameter value for 'group_name' must not be blank" if OCI::Internal::Util.blank_string?(group_name)

  path = '/streams/{streamId}/groups/{groupName}'.sub('{streamId}', stream_id.to_s).sub('{groupName}', group_name.to_s)
  operation_signing_strategy = :standard

  # rubocop:disable Style/NegatedIf
  # Query Params
  query_params = {}

  # Header Params
  header_params = {}
  header_params[:accept] = 'application/json'
  header_params[:'content-type'] = 'application/json'
  header_params[:'opc-request-id'] = opts[:opc_request_id] if opts[:opc_request_id]
  # rubocop:enable Style/NegatedIf

  post_body = @api_client.object_to_http_body(update_group_details)

  # rubocop:disable Metrics/BlockLength
  OCI::Retry.make_retrying_call(applicable_retry_config(opts), call_name: 'StreamClient#update_group') do
    @api_client.call_api(
      :PUT,
      path,
      endpoint,
      header_params: header_params,
      query_params: query_params,
      operation_signing_strategy: operation_signing_strategy,
      body: post_body
    )
  end
  # rubocop:enable Metrics/BlockLength
end