Polling Detections Continuously
In this section, we show how to use the FncApiClient
to continuously poll detections.
Polling Arguments
When polling detections continuously, it is possible to enrich the information being retrieved or filter it if necessary. This can be done with the arguments passed to the polling method in a dictionary. The allowed arguments are described below.
polling_args = {
'polling_delay': 10,
'status': 'active',
'pull_muted_detections': 'false',
'pull_muted_rules': 'false',
'pull_muted_devices': 'false',
'include_description': True,
'include_signature': True,
'include_pdns': True,
'include_dhcp': True,
'include_events': True,
'filter_training_detections': True,
'start_date': ''
}
Attribute |
Type |
Default |
Description |
---|---|---|---|
polling_delay |
int |
10
|
This is expressed in minutes and represent a delay in retrieving detections to allow time for them to be processed by the backend services. It is recommended to be 10 minutes which is the default value. |
status |
string |
‘active’ |
This argument is used to filter the retrieved detections according to the detections’ status (active or resolved). Only active detections are retrieved by default. |
pull_muted_detections, pull_muted_rules, pull_muted_devices |
string |
‘false’ |
These arguments are used to filter the retrieved detections depending on whether they are muted or not. The supported values are (‘false’, ‘true’, ‘all’). |
include_descriptions, include_signature, include_pdns, include_dhcp, include_events |
bool |
False |
These arguments are used to enrich the information being retrieved. If set to true, they tell the method to include the rule’s description or signature, entity’s pdns or dhcp information and detection’s associated events. |
filter_training_detections |
bool |
True |
This argument is used to filter any training detections. By default, it is set to true. Set it to false if training detections are required. |
start_date |
datetime |
now |
This argument is used to state since when detections need to be retrieved. This timestamp is used only if no checkpoint is provided in the context. |
limit |
int |
|
This argument is used to limit the number of detections retrieved with each piece of historical data. It should not be used for continuous polling or some detections might be missed. |
API Context
The context used during the continuous polling provides information about what was already retrieved and what needs to be retrieved. It holds three pieces of information:
Attribute |
Type |
Description |
---|---|---|
checkpoint |
string |
The value contained in the checkpoint, was the |
Polling_args |
dictionary |
After the polling method is called, the context will update the |
history |
dictionary |
The FncClient Library allows to pull historical data when pulling detections. However, it is possible that the amount of historical information is too big that it causes delays on the retrieval of current data. To avoid this delay, the client library allows you to split the context in historical and current data. This way current data and historical data can be retrieved separately. If the history value is provided in the context, it means we are retrieving historical data. |
Splitting Historical and Current Data
The client library provides a method (get_splitted_context
) that provide two contexts: one with the history value set to pull data from the provided start_date
up to now and a second one without a history value and the checkpoint
set to now
. In this way we can pull the historical data using the first context while pulling the current data using the second one.
Example:
# Split the poling interval in history and current h_context, context = client.get_splitted_context(args=polling_args)
Poll History Method
Once the context has been split, we are ready to start pulling the historical data using the FncApiClient.poll_history
method. This method retrieves a piece of the historical data. The size of the piece is determined by the interval argument which default to one day. If the limit is not reached, the method tries to pull the next piece until the limit is reached or overpassed. Every time the poll_history
method is called it updates the start_date
information in the context’s history value until it reaches the end_date
. At this point, the whole historical data would have been polled.
The |
The steps below show and describe the process for pulling the historical data.
1. Imports
The main classes required while calling specific endpoint are shown below.
from fnc.fnc_client import FncClient
from fnc.api import ApiContext, FncApiClient
from fnc.errors import FncClientError
from fnc.utils import datetime_to_utc_str
Get the Client
The FncApiClient
is created using the FncClient
class’s method and providing the required arguments. For a detailed description, Getting the client.
3. Prepare Polling Arguments and Context
When calling the poll_history
method, we need to pass a dictionary with all the arguments. The arguments are used for filtering and enriching the detection’s information. For a detailed description, see . Polling Arguments in Polling Detections Continuously.
polling_args = {
'polling_delay': 10, 'status': 'active', 'pull_muted_detections': 'false', 'pull_muted_rules': 'false', 'pull_muted_devices': 'false', 'include_description': True, 'include_signature': True, 'include_pdns': True, 'include_dhcp': True, 'include_events': True, 'filter_training_detections': True, 'limit': 500, 'start_date': '' }
4. Split the Context
We also need to pass the context with the history value to the poll_history
method.
# Split the poling interval in history and current
h_context, context = client.get_splitted_context(args=polling_args)
The history field in the context contains the start and end date to be pulled. It can be created manually but using the method above ensures duplications will be avoided since the end date of the history context will be the checkpoint in the current one. However, it can only be used once. After that, if the context needs to be recreated it will need to be updated manually with the last history value.
5. Poll the next piece of historical data
The FncApiClient’s poll_history
method is used to poll historical data. This method is a generator function and need to be used within a loop.
# The args for poll_history should be the same as for the continuous polling for response in client.poll_history( context=h_context, args=polling_args, interval= timedelta(days=1) ): # Do Something... client.get_logger().info(response)
6. Clear Context’s Arguments
The polling_args
value in the context contains the la arguments used in the last request. If they are present in the context while the polling method is called, they will be used. Therefore, they need to be cleared unless it is required to perform the same call as before.
# Ensure each iteration start without polling_args in the context h_context.clear_args()
7. Handle Errors
Any exception occurring while calling the poll_history
method will be raised as a FncClientError
exception. The specific problem can be identified by using the FncClientError
fields.
The code below shows a full example of polling detection’s historical data.
from fnc.api import ApiContext, FncApiClient from fnc.errors import FncClientError client_name = '' api_token = '' domain = '' log_level = None client: FncApiClient = FncClient.get_api_client( name=client_name, domain=domain, api_token=api_token ) client.get_logger().set_level(level=log_level) try: polling_args = { 'polling_delay': 10, 'status': 'active', 'pull_muted_detections': 'false', 'pull_muted_rules': 'false', 'pull_muted_devices': 'false', 'include_description': True, 'include_signature': True, 'include_pdns': True, 'include_dhcp': True, 'include_events': True, 'filter_training_detections': True, 'limit': 500, 'start_date': '2024-01-01T00:00:00.000000Z' } # Split the poling interval in history and current h_context, context = client.get_splitted_context(args=polling_args) # The polling args for poll_history should be the same as # for the continuous polling for response in client.poll_history(context=h_context, args=polling_args): # Do Something... client.get_logger().info(response) # Ensure each iteration start without polling_args in the context h_context.clear_args() # This is the end of the iteration. It can be called in a loop until # completed or wait for some time between iterations. It only requires # the context with the history value except FncClientError as e: # Any exception will be reported as FncClientError. Specific Error # message will be added to the exception depending on its Error Type client.get_logger().error(e)
Continuous Polling Method
Once the context is split, we are ready to start pulling detections using the FncApiClient.continuous_polling
method. This method retrieves detections since the timestamp stored in the context’s checkpoint or the provided start date. If none of those values are provided the start_date
will be set to the current time. After the call is completed, the checkpoint value in the context is updated to the used end_date
to avoid overlaps in the search windows.
The steps below show and describe the process for pulling detections continuously.
1. Imports
The main classes required while polling detections are shown below.
from fnc.fnc_client import FncClient from fnc.api import ApiContext, FncApiClient from fnc.errors import FncClientError
2. Get the Client
The FncApiClient
is created using the FncClient
class’s method and providing the required arguments. For a detailed description, see Getting the client.
3. Prepare Polling Arguments
When calling the continuous_polling
method, we need to pass a dictionary with all the arguments. The arguments are used for filtering and enriching the detection’s information. For a detailed description, see . Polling Arguments in Polling Detections Continuously.
polling_args = {
'polling_delay': 10,
'status': 'active',
'pull_muted_detections': 'false',
'pull_muted_rules': 'false',
'pull_muted_devices': 'false',
'include_description': True,
'include_signature': True,
'include_pdns': True,
'include_dhcp': True,
'include_events': True,
'filter_training_detections': True,
'start_date': '' # Get configured Start Date or now as utc
}
4. Prepare the Context
When calling the continuous_polling
method, we need to pass a context. Its checkpoint value will be used as the start_date
for the search. It needs to be updated to the last call’s checkpoint if it has not been updated already.
context = ApiContext()
context.update_checkpoint(checkpoint= checkpoint)
5. Poll detections
Using the polling strategy described above, the continuous_polling
method will perform the next search and retrieve the detections. The FncApiClient’s continuous_polling
method is a generator function and need to be used within a loop.
for response in client.continuous_polling(context=context, args=polling_args):
# Do Something...
client.get_logger().info(response)
6. Persist checkpoint and clear Context’s Arguments
The polling_args
value in the context contains the la arguments used in the last request. If they are present in the context while the polling method is called, they will be used. Therefore, they need to be cleared unless it is required to perform the same call as before.
# Persist checkpoint if needed checkpoint = context.get_checkpoint() # Ensure each iteration start without polling_args in the context context.clear_args()
7. Handle Errors
Any exception occurring while calling the continuous_polling
method will be raised as a FncClientError
exception. The specific problem can be identified by using the FncClientError
fields.
The code below shows a full example of continuously polling detections.
from fnc.api import ApiContext, FncApiClient from fnc.errors import FncClientError from fnc.fnc_client import FncClient client_name = '' api_token = '' domain = '' log_level = None try: client: FncApiClient = FncClient.get_api_client( name=client_name, domain=domain, api_token=api_token ) client.get_logger().set_level(level=log_level) polling_args = { 'polling_delay': 10, 'status': 'active', 'pull_muted_detections': 'false', 'pull_muted_rules': 'false', 'pull_muted_devices': 'false', 'include_description': True, 'include_signature': True, 'include_pdns': True, 'include_dhcp': True, 'include_events': True, 'filter_training_detections': True } checkpoint = # Get persisted checkpoint or '' start_date_str = # Get configured Start Date or now as utc polling_args['start_date'] = start_date_str context = ApiContext() context.update_checkpoint(checkpoint= checkpoint) for response in client.continuous_polling(context=context, args=polling_args): client.get_logger().info(response) # Persist checkpoint if needed checkpoint = context.get_checkpoint() # Ensure each iteration start without polling_args in the context context.clear_args() except FncClientError as e: client.get_logger().error(e)