Skip to content

Commit

Permalink
Event ingestion flow
Browse files Browse the repository at this point in the history
- Scripts are killed after 58 seconds to avoid stacking the cron jobs every minute if the work isn't done
- The start date for the 'time' parameter is reset to today whenever the tenant is changed
- The 'next' value is saved per tenant to avoid duplicate events when switching between tenants
- Retrieving 50 events per request to reduce the amount of requests done on the Flare backend
  • Loading branch information
Marc-Antoine Hinse committed Oct 31, 2024
1 parent 9444545 commit 3c8b9da
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 46 deletions.
116 changes: 96 additions & 20 deletions packages/flare/python/cron_job_ingest_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import sys
import os
from typing import Optional
from datetime import date, datetime

sys.path.insert(0, os.path.join(os.path.dirname(__file__), "vendor"))
import vendor.splunklib.client as client
Expand All @@ -13,11 +14,13 @@
SPLUNK_PORT = 8089
REALM = APP_NAME + "_realm"
KV_COLLECTION_NAME = "feednext"
CRON_JOB_MAX_RUNNING_SECONDS = 58


def main():
# Cron job are stacking, so you have to make sure to stop the current one before the next one starts
cron_job_start_timestamp = datetime.now().timestamp()
try:
# Example using a token
splunk_service = client.connect(
host=HOST,
port=SPLUNK_PORT,
Expand All @@ -29,46 +32,119 @@ def main():
raise Exception(str(e))

app: client.Application = splunk_service.apps[APP_NAME]
flare_api = FlareAPI(app=app)
api_key: Optional[str] = None
tenant_id: Optional[int] = None
for item in app.service.storage_passwords.list():
if item.content.username == "api_key":
api_key = item.clear_password

from_ = get_from_value(app=app)
event_feed = flare_api.retrieve_feed(from_=from_)
set_from_value(app=app, next=event_feed["next"])
if item.content.username == "tenant_id":
tenant_id = (
int(item.clear_password) if item.clear_password is not None else None
)

if event_feed["items"]:
for item in event_feed["items"]:
print(json.dumps(item))
if not api_key:
raise Exception("API key not found")

if not tenant_id:
raise Exception("Tenant ID not found")
flare_api = FlareAPI(app=app, api_key=api_key, tenant_id=tenant_id)

def get_from_value(app: client.Application) -> Optional[str]:
from_ = get_from_value(app=app, tenant_id=tenant_id)
start_date = get_start_date_value(app=app)
while True:
event_feed = flare_api.retrieve_feed(from_=from_, start_date=start_date)
save_values(app=app, tenant_id=tenant_id, next=event_feed["next"])

if event_feed["items"]:
for item in event_feed["items"]:
print(json.dumps(item))

if (
int(datetime.now().timestamp() - cron_job_start_timestamp)
>= CRON_JOB_MAX_RUNNING_SECONDS
):
break
if event_feed["next"]:
from_ = event_feed["next"]
else:
break


def get_from_value(app: client.Application, tenant_id: int) -> Optional[str]:
from_: Optional[str] = None
if KV_COLLECTION_NAME in app.service.kvstore:
data = app.service.kvstore[KV_COLLECTION_NAME].data.query()
if len(data) > 1:
from_ = data[0]["value"]
else:
app.service.kvstore.delete(KV_COLLECTION_NAME)
for entry in data:
if entry["_key"] == "next_{}".format(tenant_id):
from_ = entry["value"]
break

return from_


def set_from_value(app: client.Application, next: Optional[str]):
def get_start_date_value(app: client.Application) -> Optional[str]:
start_date: Optional[str] = None
if KV_COLLECTION_NAME in app.service.kvstore:
data = app.service.kvstore[KV_COLLECTION_NAME].data.query()
for entry in data:
if entry["_key"] == "start_date":
start_date = entry["value"]
break

return start_date


def get_current_tenant_id_value(app: client.Application) -> Optional[int]:
current_tenant_id: Optional[int] = -1
if KV_COLLECTION_NAME in app.service.kvstore:
data = app.service.kvstore[KV_COLLECTION_NAME].data.query()
for entry in data:
if entry["_key"] == "current_tenant_id":
current_tenant_id = int(entry["value"]) if entry["value"] else -1
break

return current_tenant_id


def save_values(app: client.Application, tenant_id: int, next: Optional[str]):
if KV_COLLECTION_NAME not in app.service.kvstore:
# Create the collection
app.service.kvstore.create(
name=KV_COLLECTION_NAME, fields={"_key": "string", "value": "string"}
)
# Insert

current_tenant_id = get_current_tenant_id_value(app=app)
# If this is the first request ever, insert today's date so that future requests will be based on that
if not get_start_date_value(app):
app.service.kvstore[KV_COLLECTION_NAME].data.insert(
json.dumps({"_key": "next", "value": next})
json.dumps(
{
"_key": "start_date",
"value": date.today().isoformat(),
}
)
)
elif not next:
app.service.kvstore[KV_COLLECTION_NAME].data.delete(id="next")
else:

# If the current tenant has changed, update the start date so that future requests will be based off today
# If you switch tenants, this will avoid the old tenant from ingesting all the events before today and the day
# that tenant was switched in the first place.
if current_tenant_id != tenant_id:
app.service.kvstore[KV_COLLECTION_NAME].data.update(
id="next", data=json.dumps({"value": next})
id="start_date", data=json.dumps({"value": date.today().isoformat()})
)

# If we have a new next value, update the collection for that tenant to continue searching from that point
if next:
if not get_from_value(app, tenant_id=tenant_id):
app.service.kvstore[KV_COLLECTION_NAME].data.insert(
json.dumps({"_key": "next_{}".format(tenant_id), "value": next})
)
else:
app.service.kvstore[KV_COLLECTION_NAME].data.update(
id="next_{}".format(tenant_id), data=json.dumps({"value": next})
)


if __name__ == "__main__":
main()
42 changes: 16 additions & 26 deletions packages/flare/python/flare.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from urllib.error import HTTPError
from vendor.flareio import FlareApiClient
import vendor.splunklib.client as client
from datetime import date

APP_NAME = "flare"

Expand Down Expand Up @@ -36,41 +37,30 @@ def get_flare_api_client(


class FlareAPI(AuthBase):
def __init__(self, *, app: client.Application) -> None:
# Should be able to use app.service.storage_passwords.get(),
# but I can't seem to get that to work. list() works.
api_key: t.Optional[str] = None
tenant_id: t.Optional[int] = None
for item in app.service.storage_passwords.list():
if item.content.username == "api_key":
api_key = item.clear_password

if item.content.username == "tenant_id":
tenant_id = (
int(item.clear_password)
if item.clear_password is not None
else None
)

if not api_key:
raise Exception("API key not found")

def __init__(
self, *, app: client.Application, api_key: str, tenant_id: int
) -> None:
self.flare_endpoints = app.service.confs["flare"]["endpoints"]

self.flare_client = get_flare_api_client(
api_key=api_key,
tenant_id=tenant_id,
)

def retrieve_feed(self, *, from_: t.Optional[str] = None) -> dict[str, t.Any]:
def retrieve_feed(
self, *, from_: t.Optional[str] = None, start_date: t.Optional[str] = None
) -> dict[str, t.Any]:
url = self.flare_endpoints["me_feed_endpoint"]
response = self.flare_client.post(
params: t.Dict[str, t.Any] = {"lite": True, "size": 50}
if from_:
params["from"] = from_
else:
params["time"] = "{}@".format(
start_date if start_date else date.today().isoformat()
)
response = self.flare_client.get(
url=url,
json={"lite": "true", "from": from_},
headers={
"Content-type": "application/json",
"Accept": "application/json",
},
params=params,
)

if response.status_code != 200:
Expand Down

0 comments on commit 3c8b9da

Please sign in to comment.