Skip to content

Client

The Message Bug Client library implements the client methods that a process needs in order to interact with the message bus.

Synopsis

use Assure1::MessageBus::Client;

my $MBClient = Assure1::MessageBus::Client($baseDir, \[{OPTIONS}\]);
my $response = $mbClient->ClientRequest(METHOD, \[ARGUMENTS\]);

Constructor

The Message Bus Client library contains the constructor described below.

new

Creates a new client for sending and receiving messages.

new(\%options)

Options

log       -> Assure1::Log instance, omitt to disable logging
config    -> Assure1::Config instance
workqueue -> Optional reference to a Thread::Queue to send incoming messages to for processing, if not given will ignore any incoming broker messages that are not replies.
             Incoming messages are queued in the following frozen format (use Storable):
               body          - message body; this can contain any number of formats such as a hash, array, or string
               type          - message type (either "direct" or "failover")
               rawmessage    - raw message info including message properties; use when sending replies through reply() as 'orig_msg'
bindings  -> Additional exchanges to bind to (e.g. EX_THRESHOLD)
timeout   -> Time to wait for response before timing out. Default 30 seconds
server    -> Specific broker to talk to; should be the FQDN. Defaults to to local hostfqdn
blocking  -> set whether to block on ClientRequest and GetResponse. Defaults to 1

Returns

Assure1::MessageBus::Client object

Synopsis

my $MBQueue      = new Thread::Queue;
my $MBClient     = Assure1::MessageBus::Client->new({
    log       => $Log,
    config    => $config,
    workqueue => \$MBQueue
});

threads->new(sub {
    # if we can't start the handler thread we want to kill the app
    threads->set_thread_exit_only(0);

    eval {
        local $SIG{__DIE__};
        $MBClient->handle();
    };

    if ($@) {
        $Self->{Log}->Message('FATAL', '{SIGDIE}: ' . $@);
    }
})->detach();

Methods

The Message Bus Client library contains the methods described below.

ClientRequest

A generic client request to call an RPC method with arguments.

ClientRequest(\%options)

Options

method    - The RPC method to call.
arguments - The optional arguments to pass with the request.
server    - (optional) The recipient server. Defaults to the value of server() which defaults to local Unified Assurance broker
blocking  - (optional) Whether to return after sending request without waiting for a response. Defaults to the current blocking() setting. The response can be retrieved later with a GetResponse() if necessary.

Returns

Hashref containing the following response keys:
Success -> 1 for Success, 0 for Failure, or -1 for in non-blocking mode and no response= yet
Message -> RPC response message, error text if failed
Data    -> Data returned or empty array if nothing. If blocking = 1 returns the message ID with which to use with a subsequent GetResponse() call

GetResponse

Requests a response to the specified message.

GetResponse($MessageID, $Blocking)

Arguments

MessageID - The ID of the message to request the response for.
Blocking  - (optional) 1 to wait for response, 0 to return data if ready or without waiting, defaults to current blocking() setting.

Returns

Hashref containing the following response keys:
Success - 1 for Success, 0 for Failure, or -1 when in non-blocking mode and no response yet.
Message - RPC response data. Error text if failed, or if blocking = 1 returns the message ID.

Synopsis

# Sending a message to local broker
Broker::Client->Reply({
   orig_msg => $msg,
   reply    => {success => 1, message => 'returning 2 items', data => [1,2]}
});

Reply

Replies to the specified message.

Reply(\%options)

Options

reply    - Reply hash, gets converted to a JSON string. Must contain the following keys:
             success - Boolean. true for success or false for failure.
             message - Optional message, if success=false, this should be the error message and data should be blank.
             data    - The array of data to return, if any. Items can be any data type except blessed objects, even hashes or arrays.
orig_msg - Original message information the reply is for

Synopsis

# Sending a message to local broker
Broker::Client->Reply({
   orig_msg => $msg,
   reply    => {success => 1, message => 'returning 2 items', data => [1,2]}
});

SendHealthMetric

Sends a health metric to InfluxDB by way of Telegraf.

SendHealthMetric(\%options)

Options

measurement      -> The Metric Type Name
tags             -> A hash reference of:
    host         -> (optional) Device Host (defaults to local HostFQDN)
    instanceName -> Instance Name (for example, Application Name)
fields           -> A hash reference of:
    value        -> The value to send
    utilization  -> (optional) The utilization of value if a max value is known.
epochTime        -> (optional) The epoch time collected (defaults to time()). Oracle recommends using the epoch from time() rounded to nearest poll time.

SendThresholdViolation

Queues a threshold violation.

SendThresholdViolation(\%options)

Options

ThresholdType      -> PollerThresholds.ThresholdTypeID (1 => Standard, 10 => Trend % Change, 14 => Abnormal, 16 => Missing Data, 20 => Trend Prediction)
ThresholdName      -> PollerThresholds.ThresholdName
ThresholdMessage   -> PollerThresholds.Message
ThresholdTimeRange -> PollerThresholds.TimeRange
ThresholdOperator  -> PollerThresholds.Warning|CriticalOperator
ThresholdValue     -> PollerThresholds.Warning|CriticalValue
ThresholdSeverity  -> PollerThresholds.Warning|CriticalSeverity
Time               -> EpochTimeStamp
Measurement        -> PollerThresholds.Measurement
MetricValue        -> InfluxDB measurement Mean value|availability|utilization (from MetricField)
DeviceName         -> InfluxDB measurement deviceName
InstanceName       -> InfluxDB measurement instance
DeviceZoneID       -> InfluxDB measurement deviceZone

blocking

Gets the current block status or sets whether to block waiting for responses.

blocking([$block])

clusterID

Gets the current clusterID or sets the clusterID for clustered polling and load balancing. Should be the FailoverName given when joining a pool.

clusterID([$clusterID])

close

Closes the current connection to the message bus.

close()

server

Gets the current server or sets the server to send ClientRequests to. Should be the FQDN of the Unified Assurance broker.

server([$server])

workqueue

Gets the current work queue or sets the work queue. Should be a reference to a Thread::Queue.

workqueue([$workqueue])