Fortinet white logo
Fortinet white logo
2024.10.0

Polling Detections Continuously

Polling Detections Continuously

In this section, we show how to use the FncApiClient to continuously poll detections.

Polling Arguments

When polling detections continuously, it is possible to enrich the information being retrieved or filter it if necessary. This can be done with the arguments passed to the polling method in a dictionary. The allowed arguments are described below.

polling_args = {

'polling_delay': 10,

'status': 'active',

'pull_muted_detections': 'false',

'pull_muted_rules': 'false',

'pull_muted_devices': 'false',

'include_description': True,

'include_signature': True,

'include_pdns': True,

'include_dhcp': True,

'include_events': True,

'filter_training_detections': True,

'start_date': ''

}

Attribute

Type

Default

Description

polling_delay

int

10

This is expressed in minutes and represent a delay in retrieving detections to allow time for them to be processed by the backend services. It is recommended to be 10 minutes which is the default value.

status

string

‘active’

This argument is used to filter the retrieved detections according to the detections’ status (active or resolved). Only active detections are retrieved by default.

pull_muted_detections,

pull_muted_rules,

pull_muted_devices

string

‘false’

These arguments are used to filter the retrieved detections depending on whether they are muted or not. The supported values are (‘false’, ‘true’, ‘all’).

include_descriptions,

include_signature,

include_pdns, include_dhcp,

include_events

bool

False

These arguments are used to enrich the information being retrieved. If set to true, they tell the method to include the rule’s description or signature, entity’s pdns or dhcp information and detection’s associated events.

filter_training_detections

bool

True

This argument is used to filter any training detections. By default, it is set to true. Set it to false if training detections are required.

start_date

datetime

now

This argument is used to state since when detections need to be retrieved. This timestamp is used only if no checkpoint is provided in the context.

limit

int

This argument is used to limit the number of detections retrieved with each piece of historical data. It should not be used for continuous polling or some detections might be missed.

API Context

The context used during the continuous polling provides information about what was already retrieved and what needs to be retrieved. It holds three pieces of information:

Attribute

Type

Description

checkpoint

string

The value contained in the checkpoint, was the end_date in the last performed search. If it is included in the context while calling to the polling method, its value will be used as the start_date. The checkpoint is updated every time the polling method is called.

Polling_args

dictionary

After the polling method is called, the context will update the polling_args value with a dictionary containing all the arguments that were send to the FortiNDR Cloud REST API when the detections were requested. This allows you to perform the same request in case of a failure.

history

dictionary

The FncClient Library allows to pull historical data when pulling detections. However, it is possible that the amount of historical information is too big that it causes delays on the retrieval of current data. To avoid this delay, the client library allows you to split the context in historical and current data. This way current data and historical data can be retrieved separately. If the history value is provided in the context, it means we are retrieving historical data.

Splitting Historical and Current Data

The client library provides a method (get_splitted_context) that provide two contexts: one with the history value set to pull data from the provided start_date up to now and a second one without a history value and the checkpoint set to now. In this way we can pull the historical data using the first context while pulling the current data using the second one.

Example:
# Split the poling interval in history and current
h_context, context = client.get_splitted_context(args=polling_args)

Poll History Method

Once the context has been split, we are ready to start pulling the historical data using the FncApiClient.poll_history method. This method retrieves a piece of the historical data. The size of the piece is determined by the interval argument which default to one day. If the limit is not reached, the method tries to pull the next piece until the limit is reached or overpassed. Every time the poll_history method is called it updates the start_date information in the context’s history value until it reaches the end_date. At this point, the whole historical data would have been polled.

Note

The get_splitted_context method is only called the first time. After that, the history value in the context is used. This is done to avoid pulling the same information over and over again.

The steps below show and describe the process for pulling the historical data.

1. Imports

The main classes required while calling specific endpoint are shown below.

from fnc.fnc_client import FncClient

from fnc.api import ApiContext, FncApiClient

from fnc.errors import FncClientError

from fnc.utils import datetime_to_utc_str

Get the Client

The FncApiClient is created using the FncClient class’s method and providing the required arguments. For a detailed description, Getting the client.

3. Prepare Polling Arguments and Context

When calling the poll_history method, we need to pass a dictionary with all the arguments. The arguments are used for filtering and enriching the detection’s information. For a detailed description, see . Polling Arguments in Polling Detections Continuously.

polling_args = {
    'polling_delay': 10, 
    'status':  'active', 
    'pull_muted_detections': 'false', 
    'pull_muted_rules':  'false', 
    'pull_muted_devices':  'false', 
    'include_description': True, 
    'include_signature': True, 
    'include_pdns': True, 
    'include_dhcp': True, 
    'include_events': True, 
    'filter_training_detections': True, 
    'limit': 500, 
    'start_date': '' 
}
4. Split the Context

We also need to pass the context with the history value to the poll_history method.

# Split the poling interval in history and current

h_context, context = client.get_splitted_context(args=polling_args)

The history field in the context contains the start and end date to be pulled. It can be created manually but using the method above ensures duplications will be avoided since the end date of the history context will be the checkpoint in the current one. However, it can only be used once. After that, if the context needs to be recreated it will need to be updated manually with the last history value.

5. Poll the next piece of historical data

The FncApiClient’s poll_history method is used to poll historical data. This method is a generator function and need to be used within a loop.

# The args for poll_history should be the same as for the continuous polling
for response in client.poll_history(
    context=h_context, 
    args=polling_args, 
    interval= timedelta(days=1) 
): 
    # Do Something... 
    client.get_logger().info(response)
6. Clear Context’s Arguments

The polling_args value in the context contains the la arguments used in the last request. If they are present in the context while the polling method is called, they will be used. Therefore, they need to be cleared unless it is required to perform the same call as before.

# Ensure each iteration start without polling_args in the context
h_context.clear_args()
7. Handle Errors

Any exception occurring while calling the poll_history method will be raised as a FncClientError exception. The specific problem can be identified by using the FncClientError fields.

The code below shows a full example of polling detection’s historical data.

from fnc.api import ApiContext, FncApiClient
from fnc.errors import FncClientError

client_name = ''
api_token = ''
domain = ''
log_level = None

client: FncApiClient = FncClient.get_api_client(
    name=client_name, 
    domain=domain, 
    api_token=api_token
)
client.get_logger().set_level(level=log_level) 

try: 
    polling_args = {
      'polling_delay': 10, 
      'status':  'active', 
      'pull_muted_detections': 'false', 
      'pull_muted_rules':  'false', 
      'pull_muted_devices':  'false', 
      'include_description': True, 
      'include_signature': True, 
      'include_pdns': True, 
      'include_dhcp': True, 
      'include_events': True, 
      'filter_training_detections': True, 
      'limit': 500, 
      'start_date': '2024-01-01T00:00:00.000000Z'
    }

    # Split the poling interval in history and current
    h_context, context = client.get_splitted_context(args=polling_args) 

    # The polling args for poll_history should be the same as
    # for the continuous polling
    for response in client.poll_history(context=h_context, args=polling_args): 
      # Do Something... 
      client.get_logger().info(response) 

    # Ensure each iteration start without polling_args in the context
    h_context.clear_args()

    # This is the end of the iteration. It can be called in a loop until
    # completed or wait for some time between iterations. It only requires
    # the context with the history value

except FncClientError as e: 
    # Any exception will be reported as FncClientError. Specific Error
    # message will be added to the exception depending on its Error Type
    client.get_logger().error(e)

Continuous Polling Method

Once the context is split, we are ready to start pulling detections using the FncApiClient.continuous_polling method. This method retrieves detections since the timestamp stored in the context’s checkpoint or the provided start date. If none of those values are provided the start_date will be set to the current time. After the call is completed, the checkpoint value in the context is updated to the used end_date to avoid overlaps in the search windows.

The steps below show and describe the process for pulling detections continuously.

1. Imports

The main classes required while polling detections are shown below.

from fnc.fnc_client import FncClient
from fnc.api import ApiContext, FncApiClient
from fnc.errors import FncClientError
2. Get the Client

The FncApiClient is created using the FncClient class’s method and providing the required arguments. For a detailed description, see Getting the client.

3. Prepare Polling Arguments

When calling the continuous_polling method, we need to pass a dictionary with all the arguments. The arguments are used for filtering and enriching the detection’s information. For a detailed description, see . Polling Arguments in Polling Detections Continuously.

polling_args = {

'polling_delay': 10,

'status': 'active',

'pull_muted_detections': 'false',

'pull_muted_rules': 'false',

'pull_muted_devices': 'false',

'include_description': True,

'include_signature': True,

'include_pdns': True,

'include_dhcp': True,

'include_events': True,

'filter_training_detections': True,

'start_date': '' # Get configured Start Date or now as utc

}

4. Prepare the Context

When calling the continuous_polling method, we need to pass a context. Its checkpoint value will be used as the start_date for the search. It needs to be updated to the last call’s checkpoint if it has not been updated already.

context = ApiContext()
context.update_checkpoint(checkpoint= checkpoint)
5. Poll detections

Using the polling strategy described above, the continuous_polling method will perform the next search and retrieve the detections. The FncApiClient’s continuous_polling method is a generator function and need to be used within a loop.

for response in client.continuous_polling(context=context, args=polling_args):

# Do Something...

client.get_logger().info(response)

6. Persist checkpoint and clear Context’s Arguments

The polling_args value in the context contains the la arguments used in the last request. If they are present in the context while the polling method is called, they will be used. Therefore, they need to be cleared unless it is required to perform the same call as before.

# Persist checkpoint if needed
checkpoint = context.get_checkpoint() 

# Ensure each iteration start without polling_args in the context
context.clear_args()
7. Handle Errors

Any exception occurring while calling the continuous_polling method will be raised as a FncClientError exception. The specific problem can be identified by using the FncClientError fields.

The code below shows a full example of continuously polling detections.

from fnc.api import ApiContext, FncApiClient
from fnc.errors import FncClientError
from fnc.fnc_client import FncClient

client_name = ''
api_token = ''
domain = ''
log_level = None

try: 
    client: FncApiClient = FncClient.get_api_client(
      name=client_name, 
      domain=domain, 
      api_token=api_token
    ) 
    client.get_logger().set_level(level=log_level) 

    polling_args = {
      'polling_delay': 10, 
      'status':  'active', 
      'pull_muted_detections': 'false', 
      'pull_muted_rules':  'false', 
      'pull_muted_devices':  'false', 
      'include_description': True, 
      'include_signature': True, 
      'include_pdns': True, 
      'include_dhcp': True, 
      'include_events': True, 
      'filter_training_detections': True
    }

    checkpoint = # Get persisted checkpoint or ''
    start_date_str = # Get configured Start Date or now as utc

    polling_args['start_date'] = start_date_str

    context = ApiContext()
    context.update_checkpoint(checkpoint= checkpoint) 

    for response in client.continuous_polling(context=context, args=polling_args): 
      client.get_logger().info(response) 

    # Persist checkpoint if needed 
    checkpoint = context.get_checkpoint() 
    # Ensure each iteration start without polling_args in the context
    context.clear_args()

except FncClientError as e: 
    client.get_logger().error(e)

Polling Detections Continuously

Polling Detections Continuously

In this section, we show how to use the FncApiClient to continuously poll detections.

Polling Arguments

When polling detections continuously, it is possible to enrich the information being retrieved or filter it if necessary. This can be done with the arguments passed to the polling method in a dictionary. The allowed arguments are described below.

polling_args = {

'polling_delay': 10,

'status': 'active',

'pull_muted_detections': 'false',

'pull_muted_rules': 'false',

'pull_muted_devices': 'false',

'include_description': True,

'include_signature': True,

'include_pdns': True,

'include_dhcp': True,

'include_events': True,

'filter_training_detections': True,

'start_date': ''

}

Attribute

Type

Default

Description

polling_delay

int

10

This is expressed in minutes and represent a delay in retrieving detections to allow time for them to be processed by the backend services. It is recommended to be 10 minutes which is the default value.

status

string

‘active’

This argument is used to filter the retrieved detections according to the detections’ status (active or resolved). Only active detections are retrieved by default.

pull_muted_detections,

pull_muted_rules,

pull_muted_devices

string

‘false’

These arguments are used to filter the retrieved detections depending on whether they are muted or not. The supported values are (‘false’, ‘true’, ‘all’).

include_descriptions,

include_signature,

include_pdns, include_dhcp,

include_events

bool

False

These arguments are used to enrich the information being retrieved. If set to true, they tell the method to include the rule’s description or signature, entity’s pdns or dhcp information and detection’s associated events.

filter_training_detections

bool

True

This argument is used to filter any training detections. By default, it is set to true. Set it to false if training detections are required.

start_date

datetime

now

This argument is used to state since when detections need to be retrieved. This timestamp is used only if no checkpoint is provided in the context.

limit

int

This argument is used to limit the number of detections retrieved with each piece of historical data. It should not be used for continuous polling or some detections might be missed.

API Context

The context used during the continuous polling provides information about what was already retrieved and what needs to be retrieved. It holds three pieces of information:

Attribute

Type

Description

checkpoint

string

The value contained in the checkpoint, was the end_date in the last performed search. If it is included in the context while calling to the polling method, its value will be used as the start_date. The checkpoint is updated every time the polling method is called.

Polling_args

dictionary

After the polling method is called, the context will update the polling_args value with a dictionary containing all the arguments that were send to the FortiNDR Cloud REST API when the detections were requested. This allows you to perform the same request in case of a failure.

history

dictionary

The FncClient Library allows to pull historical data when pulling detections. However, it is possible that the amount of historical information is too big that it causes delays on the retrieval of current data. To avoid this delay, the client library allows you to split the context in historical and current data. This way current data and historical data can be retrieved separately. If the history value is provided in the context, it means we are retrieving historical data.

Splitting Historical and Current Data

The client library provides a method (get_splitted_context) that provide two contexts: one with the history value set to pull data from the provided start_date up to now and a second one without a history value and the checkpoint set to now. In this way we can pull the historical data using the first context while pulling the current data using the second one.

Example:
# Split the poling interval in history and current
h_context, context = client.get_splitted_context(args=polling_args)

Poll History Method

Once the context has been split, we are ready to start pulling the historical data using the FncApiClient.poll_history method. This method retrieves a piece of the historical data. The size of the piece is determined by the interval argument which default to one day. If the limit is not reached, the method tries to pull the next piece until the limit is reached or overpassed. Every time the poll_history method is called it updates the start_date information in the context’s history value until it reaches the end_date. At this point, the whole historical data would have been polled.

Note

The get_splitted_context method is only called the first time. After that, the history value in the context is used. This is done to avoid pulling the same information over and over again.

The steps below show and describe the process for pulling the historical data.

1. Imports

The main classes required while calling specific endpoint are shown below.

from fnc.fnc_client import FncClient

from fnc.api import ApiContext, FncApiClient

from fnc.errors import FncClientError

from fnc.utils import datetime_to_utc_str

Get the Client

The FncApiClient is created using the FncClient class’s method and providing the required arguments. For a detailed description, Getting the client.

3. Prepare Polling Arguments and Context

When calling the poll_history method, we need to pass a dictionary with all the arguments. The arguments are used for filtering and enriching the detection’s information. For a detailed description, see . Polling Arguments in Polling Detections Continuously.

polling_args = {
    'polling_delay': 10, 
    'status':  'active', 
    'pull_muted_detections': 'false', 
    'pull_muted_rules':  'false', 
    'pull_muted_devices':  'false', 
    'include_description': True, 
    'include_signature': True, 
    'include_pdns': True, 
    'include_dhcp': True, 
    'include_events': True, 
    'filter_training_detections': True, 
    'limit': 500, 
    'start_date': '' 
}
4. Split the Context

We also need to pass the context with the history value to the poll_history method.

# Split the poling interval in history and current

h_context, context = client.get_splitted_context(args=polling_args)

The history field in the context contains the start and end date to be pulled. It can be created manually but using the method above ensures duplications will be avoided since the end date of the history context will be the checkpoint in the current one. However, it can only be used once. After that, if the context needs to be recreated it will need to be updated manually with the last history value.

5. Poll the next piece of historical data

The FncApiClient’s poll_history method is used to poll historical data. This method is a generator function and need to be used within a loop.

# The args for poll_history should be the same as for the continuous polling
for response in client.poll_history(
    context=h_context, 
    args=polling_args, 
    interval= timedelta(days=1) 
): 
    # Do Something... 
    client.get_logger().info(response)
6. Clear Context’s Arguments

The polling_args value in the context contains the la arguments used in the last request. If they are present in the context while the polling method is called, they will be used. Therefore, they need to be cleared unless it is required to perform the same call as before.

# Ensure each iteration start without polling_args in the context
h_context.clear_args()
7. Handle Errors

Any exception occurring while calling the poll_history method will be raised as a FncClientError exception. The specific problem can be identified by using the FncClientError fields.

The code below shows a full example of polling detection’s historical data.

from fnc.api import ApiContext, FncApiClient
from fnc.errors import FncClientError

client_name = ''
api_token = ''
domain = ''
log_level = None

client: FncApiClient = FncClient.get_api_client(
    name=client_name, 
    domain=domain, 
    api_token=api_token
)
client.get_logger().set_level(level=log_level) 

try: 
    polling_args = {
      'polling_delay': 10, 
      'status':  'active', 
      'pull_muted_detections': 'false', 
      'pull_muted_rules':  'false', 
      'pull_muted_devices':  'false', 
      'include_description': True, 
      'include_signature': True, 
      'include_pdns': True, 
      'include_dhcp': True, 
      'include_events': True, 
      'filter_training_detections': True, 
      'limit': 500, 
      'start_date': '2024-01-01T00:00:00.000000Z'
    }

    # Split the poling interval in history and current
    h_context, context = client.get_splitted_context(args=polling_args) 

    # The polling args for poll_history should be the same as
    # for the continuous polling
    for response in client.poll_history(context=h_context, args=polling_args): 
      # Do Something... 
      client.get_logger().info(response) 

    # Ensure each iteration start without polling_args in the context
    h_context.clear_args()

    # This is the end of the iteration. It can be called in a loop until
    # completed or wait for some time between iterations. It only requires
    # the context with the history value

except FncClientError as e: 
    # Any exception will be reported as FncClientError. Specific Error
    # message will be added to the exception depending on its Error Type
    client.get_logger().error(e)

Continuous Polling Method

Once the context is split, we are ready to start pulling detections using the FncApiClient.continuous_polling method. This method retrieves detections since the timestamp stored in the context’s checkpoint or the provided start date. If none of those values are provided the start_date will be set to the current time. After the call is completed, the checkpoint value in the context is updated to the used end_date to avoid overlaps in the search windows.

The steps below show and describe the process for pulling detections continuously.

1. Imports

The main classes required while polling detections are shown below.

from fnc.fnc_client import FncClient
from fnc.api import ApiContext, FncApiClient
from fnc.errors import FncClientError
2. Get the Client

The FncApiClient is created using the FncClient class’s method and providing the required arguments. For a detailed description, see Getting the client.

3. Prepare Polling Arguments

When calling the continuous_polling method, we need to pass a dictionary with all the arguments. The arguments are used for filtering and enriching the detection’s information. For a detailed description, see . Polling Arguments in Polling Detections Continuously.

polling_args = {

'polling_delay': 10,

'status': 'active',

'pull_muted_detections': 'false',

'pull_muted_rules': 'false',

'pull_muted_devices': 'false',

'include_description': True,

'include_signature': True,

'include_pdns': True,

'include_dhcp': True,

'include_events': True,

'filter_training_detections': True,

'start_date': '' # Get configured Start Date or now as utc

}

4. Prepare the Context

When calling the continuous_polling method, we need to pass a context. Its checkpoint value will be used as the start_date for the search. It needs to be updated to the last call’s checkpoint if it has not been updated already.

context = ApiContext()
context.update_checkpoint(checkpoint= checkpoint)
5. Poll detections

Using the polling strategy described above, the continuous_polling method will perform the next search and retrieve the detections. The FncApiClient’s continuous_polling method is a generator function and need to be used within a loop.

for response in client.continuous_polling(context=context, args=polling_args):

# Do Something...

client.get_logger().info(response)

6. Persist checkpoint and clear Context’s Arguments

The polling_args value in the context contains the la arguments used in the last request. If they are present in the context while the polling method is called, they will be used. Therefore, they need to be cleared unless it is required to perform the same call as before.

# Persist checkpoint if needed
checkpoint = context.get_checkpoint() 

# Ensure each iteration start without polling_args in the context
context.clear_args()
7. Handle Errors

Any exception occurring while calling the continuous_polling method will be raised as a FncClientError exception. The specific problem can be identified by using the FncClientError fields.

The code below shows a full example of continuously polling detections.

from fnc.api import ApiContext, FncApiClient
from fnc.errors import FncClientError
from fnc.fnc_client import FncClient

client_name = ''
api_token = ''
domain = ''
log_level = None

try: 
    client: FncApiClient = FncClient.get_api_client(
      name=client_name, 
      domain=domain, 
      api_token=api_token
    ) 
    client.get_logger().set_level(level=log_level) 

    polling_args = {
      'polling_delay': 10, 
      'status':  'active', 
      'pull_muted_detections': 'false', 
      'pull_muted_rules':  'false', 
      'pull_muted_devices':  'false', 
      'include_description': True, 
      'include_signature': True, 
      'include_pdns': True, 
      'include_dhcp': True, 
      'include_events': True, 
      'filter_training_detections': True
    }

    checkpoint = # Get persisted checkpoint or ''
    start_date_str = # Get configured Start Date or now as utc

    polling_args['start_date'] = start_date_str

    context = ApiContext()
    context.update_checkpoint(checkpoint= checkpoint) 

    for response in client.continuous_polling(context=context, args=polling_args): 
      client.get_logger().info(response) 

    # Persist checkpoint if needed 
    checkpoint = context.get_checkpoint() 
    # Ensure each iteration start without polling_args in the context
    context.clear_args()

except FncClientError as e: 
    client.get_logger().error(e)