Skip to content

Commit

Permalink
Event ingestion flow
Browse files Browse the repository at this point in the history
- Scripts are killed after 50 seconds to avoid stacking the cron jobs every minute if the work isn't done
- The start date for the 'time' parameter is reset to today whenever the tenant is changed
- The 'next' value is saved per tenant to avoid duplicate events when switching between tenants
- Retrieving 50 events per request to reduce the amount of requests done on the Flare backend
- Added ruff configuration for imports
- Added isort for linting
  • Loading branch information
Marc-Antoine Hinse committed Oct 31, 2024
1 parent 9444545 commit cf66e1e
Show file tree
Hide file tree
Showing 8 changed files with 198 additions and 60 deletions.
40 changes: 40 additions & 0 deletions .ruff.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
extend-select = [
"I",
"C411",
"C413",
"C414",
"C416",
"C417",
"C419",
"RUF015",
"E401",
]
extend-safe-fixes = [
"C411",
"C413",
"C414",
"C416",
"C417",
"C419",
"RUF015",
"E401",
]
ignore = [
"E722",
"F403",
"F405",
]

[isort]
force-single-line = true
no-sections = true
lines-after-imports = 2
lines-between-types = 1
force-wrap-aliases = true
combine-as-imports = true

[format]
quote-style = "double"
indent-style = "space"
skip-magic-trailing-comma = false
line-ending = "auto"
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ lint: setup-web venv-tools mypy format-check

.PHONY: mypy
mypy: venv-tools
venv-tools/bin/mypy packages/flare
venv-tools/bin/mypy --config-file mypy.ini packages/flare

.PHONY: splunk-local
splunk-local: venv setup-web
Expand Down
13 changes: 13 additions & 0 deletions mypy.ini
Original file line number Diff line number Diff line change
@@ -1,4 +1,17 @@
[mypy]
namespace_packages = True
explicit_package_bases = True
ignore_missing_imports = True
warn_unused_ignores = True
warn_unused_configs = True
warn_redundant_casts = True
warn_no_return = True
no_implicit_optional = True
check_untyped_defs = True
strict_equality = True
disallow_incomplete_defs = True
disallow_untyped_defs = True
disallow_untyped_calls = True
exclude = (vendor*|flare/flare*)/$
follow_imports = skip
mypy_path = packages/flare/python/vendor
1 change: 0 additions & 1 deletion packages/configuration-screen/src/models/splunk.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ export interface SplunkApplicationNamespace {
export interface SplunkPassword {
name: string;
_properties: {
// eslint-disable-next-line camelcase
clear_password: string;
};
}
Expand Down
139 changes: 116 additions & 23 deletions packages/flare/python/cron_job_ingest_events.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,27 @@
import json
import sys
import os
import sys

from datetime import date
from typing import Any
from typing import Optional


sys.path.insert(0, os.path.join(os.path.dirname(__file__), "vendor"))
import vendor.splunklib.client as client

from flare import FlareAPI


APP_NAME = "flare"
HOST = "localhost"
SPLUNK_PORT = 8089
REALM = APP_NAME + "_realm"
KV_COLLECTION_NAME = "feednext"


def main():
def main() -> None:
try:
# Example using a token
splunk_service = client.connect(
host=HOST,
port=SPLUNK_PORT,
Expand All @@ -29,44 +33,133 @@ def main():
raise Exception(str(e))

app: client.Application = splunk_service.apps[APP_NAME]
flare_api = FlareAPI(app=app)
create_collection(app=app)

# To avoid cron jobs from doing the same work at the same time, exit new cron jobs if a cron job is already doing work
if get_job_running(app):
return

api_key: Optional[str] = None
tenant_id: Optional[int] = None
for item in app.service.storage_passwords.list():
if item.content.username == "api_key":
api_key = item.clear_password

if item.content.username == "tenant_id":
tenant_id = (
int(item.clear_password) if item.clear_password is not None else None
)

if not api_key:
raise Exception("API key not found")

from_ = get_from_value(app=app)
event_feed = flare_api.retrieve_feed(from_=from_)
set_from_value(app=app, next=event_feed["next"])
if not tenant_id:
raise Exception("Tenant ID not found")

if event_feed["items"]:
for item in event_feed["items"]:
print(json.dumps(item))
set_job_running(app=app, is_running=True)
try:
flare_api = FlareAPI(app=app, api_key=api_key, tenant_id=tenant_id)

next = get_next(app=app, tenant_id=tenant_id)
start_date = get_start_date(app=app)
while True:
event_feed = flare_api.retrieve_feed(next=next, start_date=start_date)
save_start_date(app=app, tenant_id=tenant_id)
save_next(app=app, tenant_id=tenant_id, next=event_feed["next"])

if event_feed["items"]:
for item in event_feed["items"]:
print(json.dumps(item))

if event_feed["next"]:
next = event_feed["next"]
else:
break
except Exception:
pass # Adding logger in next PR

set_job_running(app=app, is_running=False)


def get_from_value(app: client.Application) -> Optional[str]:
from_: Optional[str] = None
def get_collection_value(app: client.Application, key: str) -> Optional[Any]:
if KV_COLLECTION_NAME in app.service.kvstore:
data = app.service.kvstore[KV_COLLECTION_NAME].data.query()
if len(data) > 1:
from_ = data[0]["value"]
else:
app.service.kvstore.delete(KV_COLLECTION_NAME)
for entry in data:
if entry["_key"] == key:
return entry["value"]

return None


def get_next(app: client.Application, tenant_id: int) -> Optional[str]:
return get_collection_value(app=app, key="next_{}".format(tenant_id))

return from_

def get_start_date(app: client.Application) -> Optional[str]:
return get_collection_value(app=app, key="start_date")

def set_from_value(app: client.Application, next: Optional[str]):

def get_current_tenant_id(app: client.Application) -> Optional[int]:
current_tenant_id = get_collection_value(app=app, key="current_tenant_id")
return int(current_tenant_id) if current_tenant_id else None


def get_job_running(app: client.Application) -> Optional[bool]:
is_job_running = get_collection_value(app=app, key="is_job_running")
return bool(is_job_running) if is_job_running else None


def create_collection(app: client.Application) -> None:
if KV_COLLECTION_NAME not in app.service.kvstore:
# Create the collection
app.service.kvstore.create(
name=KV_COLLECTION_NAME, fields={"_key": "string", "value": "string"}
)
# Insert


def save_start_date(app: client.Application, tenant_id: int) -> None:
current_tenant_id = get_current_tenant_id(app=app)
# If this is the first request ever, insert today's date so that future requests will be based on that
if not get_start_date(app):
app.service.kvstore[KV_COLLECTION_NAME].data.insert(
json.dumps(
{
"_key": "start_date",
"value": date.today().isoformat(),
}
)
)

# If the current tenant has changed, update the start date so that future requests will be based off today
# If you switch tenants, this will avoid the old tenant from ingesting all the events before today and the day
# that tenant was switched in the first place.
if current_tenant_id != tenant_id:
app.service.kvstore[KV_COLLECTION_NAME].data.update(
id="start_date", data=json.dumps({"value": date.today().isoformat()})
)


def save_next(app: client.Application, tenant_id: int, next: Optional[str]) -> None:
# If we have a new next value, update the collection for that tenant to continue searching from that point
if next:
if not get_next(app, tenant_id=tenant_id):
app.service.kvstore[KV_COLLECTION_NAME].data.insert(
json.dumps({"_key": "next_{}".format(tenant_id), "value": next})
)
else:
app.service.kvstore[KV_COLLECTION_NAME].data.update(
id="next_{}".format(tenant_id), data=json.dumps({"value": next})
)


def set_job_running(app: client.Application, is_running: bool) -> None:
if not get_job_running(app):
app.service.kvstore[KV_COLLECTION_NAME].data.insert(
json.dumps({"_key": "next", "value": next})
json.dumps({"_key": "is_job_running", "value": is_running})
)
elif not next:
app.service.kvstore[KV_COLLECTION_NAME].data.delete(id="next")
else:
app.service.kvstore[KV_COLLECTION_NAME].data.update(
id="next", data=json.dumps({"value": next})
id="is_job_running", data=json.dumps({"value": is_running})
)


Expand Down
52 changes: 21 additions & 31 deletions packages/flare/python/flare.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
from http.client import HTTPMessage
import typing as t
import sys
import typing as t
import vendor.splunklib.client as client

from vendor.requests.auth import AuthBase

from datetime import date
from http.client import HTTPMessage
from urllib.error import HTTPError
from vendor.flareio import FlareApiClient
import vendor.splunklib.client as client
from vendor.requests.auth import AuthBase


APP_NAME = "flare"

Expand Down Expand Up @@ -36,41 +37,30 @@ def get_flare_api_client(


class FlareAPI(AuthBase):
def __init__(self, *, app: client.Application) -> None:
# Should be able to use app.service.storage_passwords.get(),
# but I can't seem to get that to work. list() works.
api_key: t.Optional[str] = None
tenant_id: t.Optional[int] = None
for item in app.service.storage_passwords.list():
if item.content.username == "api_key":
api_key = item.clear_password

if item.content.username == "tenant_id":
tenant_id = (
int(item.clear_password)
if item.clear_password is not None
else None
)

if not api_key:
raise Exception("API key not found")

def __init__(
self, *, app: client.Application, api_key: str, tenant_id: int
) -> None:
self.flare_endpoints = app.service.confs["flare"]["endpoints"]

self.flare_client = get_flare_api_client(
api_key=api_key,
tenant_id=tenant_id,
)

def retrieve_feed(self, *, from_: t.Optional[str] = None) -> dict[str, t.Any]:
def retrieve_feed(
self, *, next: t.Optional[str] = None, start_date: t.Optional[str] = None
) -> dict[str, t.Any]:
url = self.flare_endpoints["me_feed_endpoint"]
response = self.flare_client.post(
params: t.Dict[str, t.Any] = {"lite": True, "size": 50}
if next:
params["search_after"] = next
else:
params["time"] = "{}@".format(
start_date if start_date else date.today().isoformat()
)
response = self.flare_client.get(
url=url,
json={"lite": "true", "from": from_},
headers={
"Content-type": "application/json",
"Accept": "application/json",
},
params=params,
)

if response.status_code != 200:
Expand Down
10 changes: 6 additions & 4 deletions packages/flare/python/flare_external_requests.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import sys
import os
import splunk # type: ignore[import-not-found]
import json
import os
import splunk
import sys

from typing import Any


sys.path.insert(0, os.path.join(os.path.dirname(__file__), "vendor"))
from flareio import FlareApiClient

Expand All @@ -17,7 +19,7 @@ def parseParams(payload: str) -> dict[str, Any]:


class FlareUserTenants(splunk.rest.BaseRestHandler):
def handle_POST(self):
def handle_POST(self) -> None:
payload = self.request["payload"]
params = parseParams(payload)
self.flare_client = FlareApiClient(api_key=params["apiKey"])
Expand Down
1 change: 1 addition & 0 deletions requirements.tools.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ pytest==8.3.2
mypy==1.12.1
ruff==0.7.0
splunk-appinspect==3.8.0
isort==5.13.2

0 comments on commit cf66e1e

Please sign in to comment.