Skip to content

Commit

Permalink
Event ingestion flow
Browse files Browse the repository at this point in the history
- Scripts are killed after 50 seconds to avoid stacking the cron jobs every minute if the work isn't done
- The start date for the 'time' parameter is reset to today whenever the tenant is changed
- The 'next' value is saved per tenant to avoid duplicate events when switching between tenants
- Retrieving 50 events per request to reduce the amount of requests done on the Flare backend
- Added ruff configuration for imports
  • Loading branch information
Marc-Antoine Hinse committed Oct 31, 2024
1 parent 9444545 commit 8c78fbd
Show file tree
Hide file tree
Showing 7 changed files with 165 additions and 61 deletions.
24 changes: 24 additions & 0 deletions .ruff.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
extend-select = [
"I",
"C411",
"C413",
"C414",
"C416",
"C417",
"C419",
"RUF015",
]
extend-safe-fixes = [
"C411",
"C413",
"C414",
"C416",
"C417",
"C419",
"RUF015",
]
ignore = [
"E722",
"F403",
"F405",
]
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ lint: setup-web venv-tools mypy format-check

.PHONY: mypy
mypy: venv-tools
venv-tools/bin/mypy packages/flare
venv-tools/bin/mypy --config-file mypy.ini packages/flare

.PHONY: splunk-local
splunk-local: venv setup-web
Expand Down
10 changes: 10 additions & 0 deletions mypy.ini
Original file line number Diff line number Diff line change
@@ -1,4 +1,14 @@
[mypy]
namespace_packages = True
explicit_package_bases = True
ignore_missing_imports = True
warn_unused_ignores = True
warn_unused_configs = True
warn_redundant_casts = True
warn_no_return = True
no_implicit_optional = True
check_untyped_defs = True
strict_equality = True
exclude = (vendor*|flare/flare*)/$
follow_imports = skip
mypy_path = packages/flare/python/vendor
1 change: 0 additions & 1 deletion packages/configuration-screen/src/models/splunk.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ export interface SplunkApplicationNamespace {
export interface SplunkPassword {
name: string;
_properties: {
// eslint-disable-next-line camelcase
clear_password: string;
};
}
Expand Down
129 changes: 105 additions & 24 deletions packages/flare/python/cron_job_ingest_events.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,25 @@
import json
import sys
import os
import sys
from datetime import date, datetime
from typing import Optional

sys.path.insert(0, os.path.join(os.path.dirname(__file__), "vendor"))
import vendor.splunklib.client as client

from flare import FlareAPI

APP_NAME = "flare"
HOST = "localhost"
SPLUNK_PORT = 8089
REALM = APP_NAME + "_realm"
KV_COLLECTION_NAME = "feednext"
CRON_JOB_MAX_RUNNING_SECONDS = 50


def main():
# Cron job are stacking, so you have to make sure to stop the current one before the next one starts
cron_job_start_timestamp = datetime.now().timestamp()
try:
# Example using a token
splunk_service = client.connect(
host=HOST,
port=SPLUNK_PORT,
Expand All @@ -29,46 +31,125 @@ def main():
raise Exception(str(e))

app: client.Application = splunk_service.apps[APP_NAME]
flare_api = FlareAPI(app=app)
create_collection(app=app)
api_key: Optional[str] = None
tenant_id: Optional[int] = None
for item in app.service.storage_passwords.list():
if item.content.username == "api_key":
api_key = item.clear_password

if item.content.username == "tenant_id":
tenant_id = (
int(item.clear_password) if item.clear_password is not None else None
)

from_ = get_from_value(app=app)
event_feed = flare_api.retrieve_feed(from_=from_)
set_from_value(app=app, next=event_feed["next"])
if not api_key:
raise Exception("API key not found")

if event_feed["items"]:
for item in event_feed["items"]:
print(json.dumps(item))
if not tenant_id:
raise Exception("Tenant ID not found")
flare_api = FlareAPI(app=app, api_key=api_key, tenant_id=tenant_id)

next = get_next(app=app, tenant_id=tenant_id)
start_date = get_start_date(app=app)
while True:
event_feed = flare_api.retrieve_feed(next=next, start_date=start_date)
save_start_date(app=app, tenant_id=tenant_id)
save_next(app=app, tenant_id=tenant_id, next=event_feed["next"])

if event_feed["items"]:
for item in event_feed["items"]:
print(json.dumps(item))

if (
int(datetime.now().timestamp() - cron_job_start_timestamp)
>= CRON_JOB_MAX_RUNNING_SECONDS
):
break
if event_feed["next"]:
next = event_feed["next"]
else:
break

def get_from_value(app: client.Application) -> Optional[str]:
from_: Optional[str] = None

def get_next(app: client.Application, tenant_id: int) -> Optional[str]:
next: Optional[str] = None
if KV_COLLECTION_NAME in app.service.kvstore:
data = app.service.kvstore[KV_COLLECTION_NAME].data.query()
if len(data) > 1:
from_ = data[0]["value"]
else:
app.service.kvstore.delete(KV_COLLECTION_NAME)
for entry in data:
if entry["_key"] == "next_{}".format(tenant_id):
next = entry["value"]
break

return next

return from_

def get_start_date(app: client.Application) -> Optional[str]:
start_date: Optional[str] = None
if KV_COLLECTION_NAME in app.service.kvstore:
data = app.service.kvstore[KV_COLLECTION_NAME].data.query()
for entry in data:
if entry["_key"] == "start_date":
start_date = entry["value"]
break

return start_date

def set_from_value(app: client.Application, next: Optional[str]):

def get_current_tenant_id(app: client.Application) -> Optional[int]:
current_tenant_id: Optional[int] = -1
if KV_COLLECTION_NAME in app.service.kvstore:
data = app.service.kvstore[KV_COLLECTION_NAME].data.query()
for entry in data:
if entry["_key"] == "current_tenant_id":
current_tenant_id = int(entry["value"]) if entry["value"] else -1
break

return current_tenant_id


def create_collection(app: client.Application):
if KV_COLLECTION_NAME not in app.service.kvstore:
# Create the collection
app.service.kvstore.create(
name=KV_COLLECTION_NAME, fields={"_key": "string", "value": "string"}
)
# Insert


def save_start_date(app: client.Application, tenant_id: int):
current_tenant_id = get_current_tenant_id(app=app)
# If this is the first request ever, insert today's date so that future requests will be based on that
if not get_start_date(app):
app.service.kvstore[KV_COLLECTION_NAME].data.insert(
json.dumps({"_key": "next", "value": next})
json.dumps(
{
"_key": "start_date",
"value": date.today().isoformat(),
}
)
)
elif not next:
app.service.kvstore[KV_COLLECTION_NAME].data.delete(id="next")
else:

# If the current tenant has changed, update the start date so that future requests will be based off today
# If you switch tenants, this will avoid the old tenant from ingesting all the events before today and the day
# that tenant was switched in the first place.
if current_tenant_id != tenant_id:
app.service.kvstore[KV_COLLECTION_NAME].data.update(
id="next", data=json.dumps({"value": next})
id="start_date", data=json.dumps({"value": date.today().isoformat()})
)


def save_next(app: client.Application, tenant_id: int, next: Optional[str]):
# If we have a new next value, update the collection for that tenant to continue searching from that point
if next:
if not get_next(app, tenant_id=tenant_id):
app.service.kvstore[KV_COLLECTION_NAME].data.insert(
json.dumps({"_key": "next_{}".format(tenant_id), "value": next})
)
else:
app.service.kvstore[KV_COLLECTION_NAME].data.update(
id="next_{}".format(tenant_id), data=json.dumps({"value": next})
)


if __name__ == "__main__":
main()
53 changes: 21 additions & 32 deletions packages/flare/python/flare.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
from http.client import HTTPMessage
import typing as t
import sys

from vendor.requests.auth import AuthBase

import typing as t
from datetime import date
from http.client import HTTPMessage
from urllib.error import HTTPError
from vendor.flareio import FlareApiClient

import vendor.splunklib.client as client
from vendor.flareio import FlareApiClient
from vendor.requests.auth import AuthBase

APP_NAME = "flare"

Expand Down Expand Up @@ -36,41 +36,30 @@ def get_flare_api_client(


class FlareAPI(AuthBase):
def __init__(self, *, app: client.Application) -> None:
# Should be able to use app.service.storage_passwords.get(),
# but I can't seem to get that to work. list() works.
api_key: t.Optional[str] = None
tenant_id: t.Optional[int] = None
for item in app.service.storage_passwords.list():
if item.content.username == "api_key":
api_key = item.clear_password

if item.content.username == "tenant_id":
tenant_id = (
int(item.clear_password)
if item.clear_password is not None
else None
)

if not api_key:
raise Exception("API key not found")

def __init__(
self, *, app: client.Application, api_key: str, tenant_id: int
) -> None:
self.flare_endpoints = app.service.confs["flare"]["endpoints"]

self.flare_client = get_flare_api_client(
api_key=api_key,
tenant_id=tenant_id,
)

def retrieve_feed(self, *, from_: t.Optional[str] = None) -> dict[str, t.Any]:
def retrieve_feed(
self, *, next: t.Optional[str] = None, start_date: t.Optional[str] = None
) -> dict[str, t.Any]:
url = self.flare_endpoints["me_feed_endpoint"]
response = self.flare_client.post(
params: t.Dict[str, t.Any] = {"lite": True, "size": 50}
if next:
params["search_after"] = next
else:
params["time"] = "{}@".format(
start_date if start_date else date.today().isoformat()
)
response = self.flare_client.get(
url=url,
json={"lite": "true", "from": from_},
headers={
"Content-type": "application/json",
"Accept": "application/json",
},
params=params,
)

if response.status_code != 200:
Expand Down
7 changes: 4 additions & 3 deletions packages/flare/python/flare_external_requests.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import sys
import os
import splunk # type: ignore[import-not-found]
import json
import os
import sys
from typing import Any

import splunk

sys.path.insert(0, os.path.join(os.path.dirname(__file__), "vendor"))
from flareio import FlareApiClient

Expand Down

0 comments on commit 8c78fbd

Please sign in to comment.