How to Monitor Acronis Protected Workloads using PRTG

Acronis
Acronis Cyber Protect Cloud
for Service Providers

Expand, automate and scale your service delivery capabilities with cyber protection. Acronis Cyber Platform — the foundation of Acronis’ cyber protection solutions —provides a public API, comprehensive documentation, sample code, training and community forums to empower developers like you to differentiate.

Acronis’ integration with Paessler PRTG is among the multiple ecosystem integrations Acronis released this year. This integration helps PRTG administrators reduce complexity by efficiently monitoring cyber protection plans and statuses through the PRTG interface. 

This article will look at how you can leverage the Acronis Cyber Platform to extend the capabilities of Acronis and Paessler PRTG integration and maximize the cyber protection monitoring you offer to clients.

Monitor Acronis protected workloads with Python Script Advanced Sensor

For our sensor, we use Python Script Advanced Sensor. Acronis Cyber Protect Cloud has a cloud-centric model, so we will run that sensor from the PRTG server/cluster and collect information regarding all workloads protected by Acronis.

For simplicity of that sensor example, we know that PRTG server/cluster is a highly protected environment, so our Python script will receive API client information as an input to issue tokens and be authorized in the Acronis Cyber Protect Cloud. Please consult with your security team to see if this approach aligns with your own internal regulations.

Start by writing a script to run the scan as the base. The sensor_example.py file from PRTG installation can be used as the start point

# -*- coding: utf-8 -*-

import JSON

import sys

from prtg.sensor.result import CustomSensorResult

from prtg.sensor.units import ValueUnit

if __name__ == "__main__":

    try:

        data = json.loads(sys.argv[1])

        csr = CustomSensorResult(text="This sensor runs on %s" % data["host"])

        csr.add_primary_channel(name="Percentage",

                                value=87,

                                unit=ValueUnit.PERCENT,

                                is_float=False,

                                is_limit_mode=True,

                                limit_min_error=10,

                                limit_max_error=90,

                                limit_error_msg="Percentage too high")

        csr.add_channel(name="Response Time",

                        value=4711,

                        unit=ValueUnit.TIMERESPONSE)

        print(csr.json_result)

    except Exception as e:

        csr = CustomSensorResult(text="Python Script execution error")

        csr.error = "Python Script execution error: %s" % str(e)

        print(csr.json_result)

First, we need to receive information to successfully authorize the Acronis Cyber Protect Cloud. To do so, we create an API client and put its report into the JSON file, like this (real values hidden as they are credentials to access the Acronis Cyber Protect Cloud):

{

   "client_id": "12….d6",

   "secret:" "41….d0",

   "base_url": https://dev-cloud.acronis.com

}


This JSON will be an additional parameter for the Python Script Advanced Sensor.

Next, we need to add these parameters, reading from the Python Script Advanced Sensor input.

# Load JSON from Additional Parameters filed in the Sensor UI

connection_parameters = json.loads(data[params])

# Take values from JSON

client_id = connection_parameters[client_id]

client_secret = connection_parameters[secret]

base_url = connection_parameters[base_url]

Next, add an authorization token issue code as well as alert number requests from the Acronis Cyber Protect Cloud.

# Get alerts count for a tenant defined in the token scope

def get_partner_alerts_count(base_url: str, token: str):

    response = requests.get(

        f'{base_url}/api/alert_manager/v1/count',

        auth=BearerAuth(token)

    )

    if response.ok:

        return response.json()[total]

    else:

        raise ValueError(response.text)

# Issue a JWT token based on an API Client

def issue_token(base_url: str, client_id: str, client_secret: str):

    response = requests.post(

        f'{base_url}/api/2/idp/token',

        headers={'Content-Type': 'application/x-www-form-urlencoded'},

        auth=(client_id, client_secret),

        data={'grant_type': 'client_credentials'}

    )

    if response.ok:

        return response.json()[access_token]

    else:

        raise ValueError(response.text)

Now, our first iteration of Acronis Sensor is ready; we only need to configure PRTG data.

# -*- coding: utf-8 -*-

import json

import sys

import requests

from prtg.sensor.result import CustomSensorResult

from prtg.sensor.units import ValueUnit

# Class to support bearer authorization in requests

class BearerAuth(requests.auth.AuthBase):

    def __init__(self, token):

        self.token = token

    def __call__(self, r):

        r.headers[Authorization] = Bearer  + self.token

        return r

# Get alerts count for a tenant defined in the token scope

def get_partner_alerts_count(base_url: str, token: str):

    response = requests.get(

        f'{base_url}/api/alert_manager/v1/count',

        auth=BearerAuth(token)

    )

    if response.ok:

        return response.json()[total]

    else:

        raise ValueError(response.text)

# Issue a JWT token based on an API Client

def issue_token(base_url: str, client_id: str, client_secret: str):

    response = requests.post(

        f'{base_url}/api/2/idp/token',

        headers={'Content-Type': 'application/x-www-form-urlencoded'},

        auth=(client_id, client_secret),

        data={'grant_type': 'client_credentials'}

    )

    if response.ok:

        return response.json()[access_token]

    else:

        raise ValueError(response.text)

if __name__ == "__main__":

    try:

        # Load JSON sent to the function

        data = json.loads(sys.argv[1])

        # Load JSON from Additional Parameters filed in the Sensor UI

        connection_parameters = json.loads(data[params])

        # Take values from JSON

        client_id = connection_parameters[client_id]

        client_secret = connection_parameters[secret]

        base_url = connection_parameters[base_url]

        # All API calls should be authorized by bearer

        # Issue a bearer token

        token = issue_token(base_url, client_id, client_secret)

        # Start to build sensor output

        # It's sub-headline for the sensor

        csr = CustomSensorResult(

            text=This sensor show Acronis information for the partner: %s % partner_name)

        # Add the primary channel data

  csr.add_primary_channel(name="Alerts",

                                value=get_partner_alerts_count(

                                    base_url, token),

                                unit=ValueUnit.COUNT,

                                is_float=False,

                                is_limit_mode=False)

        # Sensor data output

        # DO NOT OUTPUT ANYTHIN ELSE IT WILL BROKE SENSOR

        print(csr.json_result)

    # General exception processing

    except Exception as e:

        csr = CustomSensorResult(text="Python Script execution error")

        csr.error = "Python Script execution error: %s" % str(e)

        print(csr.json_result)



Copy that Python script to the %Program Files (x86)%\PRTG Network Monitor\Custom Sensors\python directory. In our case, we use the acronis_example.py name.

Now we are ready to configure the sensor. Select Add sensor and then find and add Python Script Advanced

Configuring Python Script Advanced Sensor

Select Python script name (acronis_example.py) and copy JSON with the API Client information to the additional parameters field.

Configured Python Script Advanced Sensor

Now save the sensor and check that it works.

Acronis Python Script Advanced Sensor

Congratulations! We have our Acronis monitoring sensor ready and running!

Now, let’s extend it a little bit. At first, make it a bit more informative by adding a partner name, and adding that partner’s children’s organizations with their alerts count as the next step. It’s very straightforward. The only thing that needs attention paid to is that the alerts API takes the scope from a token, so we need to re-issue the token  for a children’s organization to receive their alert count.

See below the modified Python sensor file.

# -*- coding: utf-8 -*-

import json

import sys

import requests

from prtg.sensor.result import CustomSensorResult

from prtg.sensor.units import ValueUnit

# Class to support bearer authorization in requests

class BearerAuth(requests.auth.AuthBase):

    def __init__(self, token):

        self.token = token

    def __call__(self, r):

        r.headers[Authorization] = Bearer  + self.token

        return r

# Get a partner tenant UUID by an API Client id

def get_partner_tenant_id(base_url: str, client_id: str, token: str):

    response = requests.get(

        f'{base_url}/api/2/clients/{client_id}',

        auth=BearerAuth(token)

    )

    if response.ok:

        return response.json()[tenant_id]

    else:

        raise ValueError(response.text)

# Provide JSON array of children of the tenant with specified tenant id

def get_partner_customers(base_url: str, token: str, tenant_id: str):

    response = requests.get(

        f'{base_url}/api/2/tenants?parent_id={tenant_id}',

        auth=BearerAuth(token)

    )

    if response.ok:

        return response.json()[items]

    else:

        raise ValueError(response.text)

# Get a partner tenannt name by an API Client id

def get_partner_name(base_url: str, client_id: str, token: str):

    tenant_id = get_partner_tenant_id(base_url, client_id, token)

    response = requests.get(

        f'{base_url}/api/2/tenants/{tenant_id}',

        auth=BearerAuth(token)

    )

    if response.ok:

        return response.json()[name]

    else:

        raise ValueError(response.text)

# Get a tenant name by tenant UUID

def get_tenant_name(base_url: str, tenant_id: str, token: str):

    response = requests.get(

        f'{base_url}/api/2/tenants/{tenant_id}',

        auth=BearerAuth(token)

    )

    if response.ok:

        return response.json()[name]

    else:

        raise ValueError(response.text)

# Get alerts count for a tenant defined in the token scope

def get_partner_alerts_count(base_url: str, token: str):

    response = requests.get(

        f'{base_url}/api/alert_manager/v1/count',

        auth=BearerAuth(token)

    )

    if response.ok:

        return response.json()[total]

    else:

        raise ValueError(response.text)

# Issue a JWT token based on an API Client

def issue_token(base_url: str, client_id: str, client_secret: str):

    response = requests.post(

        f'{base_url}/api/2/idp/token',

        headers={'Content-Type': 'application/x-www-form-urlencoded'},

        auth=(client_id, client_secret),

        data={'grant_type': 'client_credentials'}

    )

    if response.ok:

        return response.json()[access_token]

    else:

        raise ValueError(response.text)

# Exchange upper level token to a token scoped by subtenant

def issue_scoped_token(base_url: str, token: str, tenant_id: str):

    params = {

        'grant_type': 'urn:ietf:params:oauth:grant-type:jwt-bearer',

        'assertion': f'{token}',

        'scope': f'urn:acronis.com:tenant-id:{tenant_id}'

    }

    response = requests.post(

        f'{base_url}/api/2/idp/token',

        headers={'Content-Type': 'application/x-www-form-urlencoded'},

        auth=BearerAuth(token),

        data=params

    )

    if response.ok:

        return response.json()[access_token]

    else:

        raise ValueError(response.text)

if __name__ == __main__:

    try:

        # Load JSON sent to the function

        data = json.loads(sys.argv[1])

        # Load JSON from Additional Parameters filed in the Sensor UI

        connection_parameters = json.loads(data[params])

        # Take values from JSON

        client_id = connection_parameters[client_id]

        client_secret = connection_parameters[secret]

        base_url = connection_parameters[base_url]

        # All API calls should be authorized by bearer

        # Issue a bearer token

        token = issue_token(base_url, client_id, client_secret)

        # Get a prtner tenant id

        partner_tenant_id = get_partner_tenant_id(base_url, client_id, token)

        # Get a partner name

        partner_name = get_partner_name(base_url, client_id, token)

        # Start to build sensor output

        # It's sub-headline for the sensor

        csr = CustomSensorResult(

            text=This sensor show Acronis information for the partner: %s % partner_name)

        # Add the primary channel data

        csr.add_primary_channel(name=partner_name +  Alerts,

                                value=get_partner_alerts_count(

                                    base_url, token),

                                unit=ValueUnit.COUNT,

                                is_float=False,

                                is_limit_mode=False)

        # Take children tenants list 

        customers = get_partner_customers(base_url, token, partner_tenant_id)

        # Loop throught the children tenants list

        for customer in customers:

            # Scope token for the current child tenant

            scoped_token = issue_scoped_token(base_url, token, customer[id])

            # Use that token to check number of alerts and

            # Put it into the sensor channel

            csr.add_channel(name=Customer  + customer[name] +  Alerts,

                                 value=get_partner_alerts_count(

                                     base_url, scoped_token),

                                 unit=ValueUnit.COUNT,

                                 is_float=False,

                                 is_limit_mode=False)

        # Sensor data output

        # DO NOT OUTPUT ANYTHIN ELSE IT WILL BROKE SENSOR

        print(csr.json_result)

    # General exception processing

    except Exception as e:

        csr = CustomSensorResult(text=Python Script execution error)

        csr.error = Python Script execution error: %s % str(e)

        print(csr.json_result)

Update the script, and PRTG will use the new version automatically. As a result, your Acronis sensor should resemble the following.

Extended Acronis Python Script Advanced Sensor

Summary

As you can see, it’s pretty simple to build a sophisticated dashboard to monitor the Acronis Cyber Protect Cloud with PRTG and Python Script Advanced Sensor.

The code example above can be used to build nearly any kind of Acronis Cyber Protect Cloud monitoring sensors for PRTG. We hope this article and code examples help you maintain and protect your infrastructure and workloads. 

For further API learning, register at our Platform Program or Developer Network on https://developer.acronis.com site.

Find our public Postman API collection for standard automation and integration tasks https://explore.postman.com/grey-rocket-585331 or use our GitHub examples https://github.com/acronis.

About Acronis

Acronis is a Swiss company, founded in Singapore. Celebrating two decades of innovation, Acronis has more than 1,800 employees in 45 locations. The Acronis Cyber Protect Cloud solution is available in 26 languages in over 150 countries and is used by 20,000 service providers to protect over 750,000 businesses.