Client¶
The Message Bug Client library implements the client methods that a process needs in order to interact with the message bus.
Synopsis¶
use Assure1::MessageBus::Client;
my $MBClient = Assure1::MessageBus::Client($baseDir, \[{OPTIONS}\]);
my $response = $mbClient->ClientRequest(METHOD, \[ARGUMENTS\]);
Constructor¶
The Message Bus Client library contains the constructor described below.
new¶
Creates a new client for sending and receiving messages.
new(\%options)
Options
log -> Assure1::Log instance, omitt to disable logging
config -> Assure1::Config instance
workqueue -> Optional reference to a Thread::Queue to send incoming messages to for processing, if not given will ignore any incoming broker messages that are not replies.
Incoming messages are queued in the following frozen format (use Storable):
body - message body; this can contain any number of formats such as a hash, array, or string
type - message type (either "direct" or "failover")
rawmessage - raw message info including message properties; use when sending replies through reply() as 'orig_msg'
bindings -> Additional exchanges to bind to (e.g. EX_THRESHOLD)
timeout -> Time to wait for response before timing out. Default 30 seconds
server -> Specific broker to talk to; should be the FQDN. Defaults to to local hostfqdn
blocking -> set whether to block on ClientRequest and GetResponse. Defaults to 1
Returns
Assure1::MessageBus::Client object
Synopsis
my $MBQueue = new Thread::Queue;
my $MBClient = Assure1::MessageBus::Client->new({
log => $Log,
config => $config,
workqueue => \$MBQueue
});
threads->new(sub {
# if we can't start the handler thread we want to kill the app
threads->set_thread_exit_only(0);
eval {
local $SIG{__DIE__};
$MBClient->handle();
};
if ($@) {
$Self->{Log}->Message('FATAL', '{SIGDIE}: ' . $@);
}
})->detach();
Methods¶
The Message Bus Client library contains the methods described below.
ClientRequest¶
A generic client request to call an RPC method with arguments.
ClientRequest(\%options)
Options
method - The RPC method to call.
arguments - The optional arguments to pass with the request.
server - (optional) The recipient server. Defaults to the value of server() which defaults to local Unified Assurance broker
blocking - (optional) Whether to return after sending request without waiting for a response. Defaults to the current blocking() setting. The response can be retrieved later with a GetResponse() if necessary.
Returns
Hashref containing the following response keys:
Success -> 1 for Success, 0 for Failure, or -1 for in non-blocking mode and no response= yet
Message -> RPC response message, error text if failed
Data -> Data returned or empty array if nothing. If blocking = 1 returns the message ID with which to use with a subsequent GetResponse() call
GetResponse¶
Requests a response to the specified message.
GetResponse($MessageID, $Blocking)
Arguments
MessageID - The ID of the message to request the response for.
Blocking - (optional) 1 to wait for response, 0 to return data if ready or without waiting, defaults to current blocking() setting.
Returns
Hashref containing the following response keys:
Success - 1 for Success, 0 for Failure, or -1 when in non-blocking mode and no response yet.
Message - RPC response data. Error text if failed, or if blocking = 1 returns the message ID.
Synopsis
# Sending a message to local broker
Broker::Client->Reply({
orig_msg => $msg,
reply => {success => 1, message => 'returning 2 items', data => [1,2]}
});
Reply¶
Replies to the specified message.
Reply(\%options)
Options
reply - Reply hash, gets converted to a JSON string. Must contain the following keys:
success - Boolean. true for success or false for failure.
message - Optional message, if success=false, this should be the error message and data should be blank.
data - The array of data to return, if any. Items can be any data type except blessed objects, even hashes or arrays.
orig_msg - Original message information the reply is for
Synopsis
# Sending a message to local broker
Broker::Client->Reply({
orig_msg => $msg,
reply => {success => 1, message => 'returning 2 items', data => [1,2]}
});
SendHealthMetric¶
Sends a health metric to InfluxDB by way of Telegraf.
SendHealthMetric(\%options)
Options
measurement -> The Metric Type Name
tags -> A hash reference of:
host -> (optional) Device Host (defaults to local HostFQDN)
instanceName -> Instance Name (for example, Application Name)
fields -> A hash reference of:
value -> The value to send
utilization -> (optional) The utilization of value if a max value is known.
epochTime -> (optional) The epoch time collected (defaults to time()). Oracle recommends using the epoch from time() rounded to nearest poll time.
SendThresholdViolation¶
Queues a threshold violation.
SendThresholdViolation(\%options)
Options
ThresholdType -> PollerThresholds.ThresholdTypeID (1 => Standard, 10 => Trend % Change, 14 => Abnormal, 16 => Missing Data, 20 => Trend Prediction)
ThresholdName -> PollerThresholds.ThresholdName
ThresholdMessage -> PollerThresholds.Message
ThresholdTimeRange -> PollerThresholds.TimeRange
ThresholdOperator -> PollerThresholds.Warning|CriticalOperator
ThresholdValue -> PollerThresholds.Warning|CriticalValue
ThresholdSeverity -> PollerThresholds.Warning|CriticalSeverity
Time -> EpochTimeStamp
Measurement -> PollerThresholds.Measurement
MetricValue -> InfluxDB measurement Mean value|availability|utilization (from MetricField)
DeviceName -> InfluxDB measurement deviceName
InstanceName -> InfluxDB measurement instance
DeviceZoneID -> InfluxDB measurement deviceZone
blocking¶
Gets the current block status or sets whether to block waiting for responses.
blocking([$block])
clusterID¶
Gets the current clusterID or sets the clusterID for clustered polling and load balancing. Should be the FailoverName given when joining a pool.
clusterID([$clusterID])
close¶
Closes the current connection to the message bus.
close()
server¶
Gets the current server or sets the server to send ClientRequests to. Should be the FQDN of the Unified Assurance broker.
server([$server])
workqueue¶
Gets the current work queue or sets the work queue. Should be a reference to a Thread::Queue.
workqueue([$workqueue])