Fortinet white logo
Fortinet white logo
2024.10.0

Fetch Events by Day

Fetch Events by Day

Given the large amount of information that might be stored in AWS S3 Buckets, it is recommended to retrieve events for one event type at a time and search for a period no longer than a day. This recommendation is enforced by the fetch_events_by_day() function. It allows you to fetch all raw events for a specified event type that were observed during a specified day.

Keep the following considerations in mind:
  • The day is specified as a timestamp. The time zone information must be provided or UTC and it will be assumed by default.
  • This is a generator function that produces a series of events usable in a for-loop or that can be retrieved one at a time with the =next()function.

In this section, we show how to use the FncMetastreamClient to fetch events for an specific day. The steps involved are:

1. Imports

The main classes required while calling specific endpoint are shown below.

from fnc.fnc_client import FncClient
from fnc.metastream import FncMetastreamClient
from fnc.errors import FncClientError
2. Get the Client

The FncMetastreamClient is created using the FncClient class’s method and providing the required arguments. For a detailed description, see Getting the client.

3. Fetch events

The FncMetastreamClient.fetch_events_by_day method is used to fetch events for a specific day. The method requires two arguments the endpoint to be called and a dictionary with its required arguments.

client.call_endpoint(
    day: datetime, 
    event_type: str, 
    limit: int = 0, 
    context: MetastreamContext = None
)

Property

Type

Required

Description

day

datetime

true

The day to download events from. Time is ignored if given. UTC is assumed if time zone information is not provided. Must be within the last 7 days.

event_type

string

true

The event type to download. Possible values are observation, suricata.

limit

int

false

The maximum number of events to fetch. Must be between 1 and 10000.

context

MetastreamContext

false

An object that stores specific session wide data such as metrics and checkpoint.

4. Handle Errors

Any exception occurring while fetching the events will be raised as a FncClientError exception. The specific problem can be identified by using the FncClientError fields.

Example

The code below will retrieve Observations from the previous two days until all the Observations have been retrieved and print the size of each piece.

from fnc.metastream import FncMetastreamClient
from fnc.errors import FncClientError
from fnc.fnc_client import FncClient

client_name = ''
access_key = ''
secret_key = ''
account_code = ''
bucket_name = ''
log_level = None
client = None

try: 
    client: FncMetastreamClient = FncClient.get_metastream_client(
      name= client_name, 
      access_key= access_key, 
      secret_key= secret_key, 
      account_code = account_code, 
      bucket_name= bucket_name
    )
    client.get_logger().set_level(level=log_level) 
    day = datetime.now(timezone.utc) - timedelta(days=2) 

    for events in client.fetch_events_by_day(event_type='observation', day=day):
      #process events
      print(f'num events: {len(events)}') 

except FncClientError as e: 
    client.get_logger().error(e)

Fetch Events by Day

Fetch Events by Day

Given the large amount of information that might be stored in AWS S3 Buckets, it is recommended to retrieve events for one event type at a time and search for a period no longer than a day. This recommendation is enforced by the fetch_events_by_day() function. It allows you to fetch all raw events for a specified event type that were observed during a specified day.

Keep the following considerations in mind:
  • The day is specified as a timestamp. The time zone information must be provided or UTC and it will be assumed by default.
  • This is a generator function that produces a series of events usable in a for-loop or that can be retrieved one at a time with the =next()function.

In this section, we show how to use the FncMetastreamClient to fetch events for an specific day. The steps involved are:

1. Imports

The main classes required while calling specific endpoint are shown below.

from fnc.fnc_client import FncClient
from fnc.metastream import FncMetastreamClient
from fnc.errors import FncClientError
2. Get the Client

The FncMetastreamClient is created using the FncClient class’s method and providing the required arguments. For a detailed description, see Getting the client.

3. Fetch events

The FncMetastreamClient.fetch_events_by_day method is used to fetch events for a specific day. The method requires two arguments the endpoint to be called and a dictionary with its required arguments.

client.call_endpoint(
    day: datetime, 
    event_type: str, 
    limit: int = 0, 
    context: MetastreamContext = None
)

Property

Type

Required

Description

day

datetime

true

The day to download events from. Time is ignored if given. UTC is assumed if time zone information is not provided. Must be within the last 7 days.

event_type

string

true

The event type to download. Possible values are observation, suricata.

limit

int

false

The maximum number of events to fetch. Must be between 1 and 10000.

context

MetastreamContext

false

An object that stores specific session wide data such as metrics and checkpoint.

4. Handle Errors

Any exception occurring while fetching the events will be raised as a FncClientError exception. The specific problem can be identified by using the FncClientError fields.

Example

The code below will retrieve Observations from the previous two days until all the Observations have been retrieved and print the size of each piece.

from fnc.metastream import FncMetastreamClient
from fnc.errors import FncClientError
from fnc.fnc_client import FncClient

client_name = ''
access_key = ''
secret_key = ''
account_code = ''
bucket_name = ''
log_level = None
client = None

try: 
    client: FncMetastreamClient = FncClient.get_metastream_client(
      name= client_name, 
      access_key= access_key, 
      secret_key= secret_key, 
      account_code = account_code, 
      bucket_name= bucket_name
    )
    client.get_logger().set_level(level=log_level) 
    day = datetime.now(timezone.utc) - timedelta(days=2) 

    for events in client.fetch_events_by_day(event_type='observation', day=day):
      #process events
      print(f'num events: {len(events)}') 

except FncClientError as e: 
    client.get_logger().error(e)