Skip to content

Commit

Permalink
Merge pull request #18 from Flared/mahinse/implement_event_ingestion
Browse files Browse the repository at this point in the history
Event ingestion flow
  • Loading branch information
TyMarc authored Nov 1, 2024
2 parents 5e759b3 + 0a54c4e commit 62b27dd
Show file tree
Hide file tree
Showing 8 changed files with 249 additions and 79 deletions.
40 changes: 40 additions & 0 deletions .ruff.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
extend-select = [
"I",
"C411",
"C413",
"C414",
"C416",
"C417",
"C419",
"RUF015",
"E401",
]
extend-safe-fixes = [
"C411",
"C413",
"C414",
"C416",
"C417",
"C419",
"RUF015",
"E401",
]
ignore = [
"E722",
"F403",
"F405",
]

[isort]
force-single-line = true
no-sections = true
lines-after-imports = 2
lines-between-types = 1
force-wrap-aliases = true
combine-as-imports = true

[format]
quote-style = "double"
indent-style = "space"
skip-magic-trailing-comma = false
line-ending = "auto"
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ lint: setup-web venv-tools mypy format-check

.PHONY: mypy
mypy: venv-tools
venv-tools/bin/mypy packages/flare
venv-tools/bin/mypy --config-file mypy.ini packages/flare

.PHONY: splunk-local
splunk-local: venv setup-web
Expand Down
13 changes: 13 additions & 0 deletions mypy.ini
Original file line number Diff line number Diff line change
@@ -1,4 +1,17 @@
[mypy]
namespace_packages = True
explicit_package_bases = True
ignore_missing_imports = True
warn_unused_ignores = True
warn_unused_configs = True
warn_redundant_casts = True
warn_no_return = True
no_implicit_optional = True
check_untyped_defs = True
strict_equality = True
disallow_incomplete_defs = True
disallow_untyped_defs = True
disallow_untyped_calls = True
exclude = (vendor*|flare/flare*)/$
follow_imports = skip
mypy_path = packages/flare/python/vendor
1 change: 0 additions & 1 deletion packages/configuration-screen/src/models/splunk.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ export interface SplunkApplicationNamespace {
export interface SplunkPassword {
name: string;
_properties: {
// eslint-disable-next-line camelcase
clear_password: string;
};
}
Expand Down
192 changes: 166 additions & 26 deletions packages/flare/python/cron_job_ingest_events.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,44 @@
import json
import sys
import os
import sys
import time

from datetime import date
from datetime import datetime
from datetime import timedelta
from enum import Enum
from typing import Any
from typing import Optional


sys.path.insert(0, os.path.join(os.path.dirname(__file__), "vendor"))
import vendor.splunklib.client as client

from flare import FlareAPI


APP_NAME = "flare"
HOST = "localhost"
SPLUNK_PORT = 8089
REALM = APP_NAME + "_realm"
KV_COLLECTION_NAME = "feednext"
KV_COLLECTION_NAME = "event_ingestion_collection"
CRON_JOB_THRESHOLD_SINCE_LAST_FETCH = timedelta(minutes=10)


class PasswordKeys(Enum):
API_KEY = "api_key"
TENANT_ID = "tenant_id"

def main():

class CollectionKeys(Enum):
CURRENT_TENANT_ID = "current_tenant_id"
START_DATE = "start_date"
NEXT_TOKEN = "next_"
TIMESTAMP_LAST_FETCH = "timestamp_last_fetch"


def main() -> None:
try:
# Example using a token
splunk_service = client.connect(
host=HOST,
port=SPLUNK_PORT,
Expand All @@ -29,44 +50,163 @@ def main():
raise Exception(str(e))

app: client.Application = splunk_service.apps[APP_NAME]
flare_api = FlareAPI(app=app)
create_collection(app=app)

from_ = get_from_value(app=app)
event_feed = flare_api.retrieve_feed(from_=from_)
set_from_value(app=app, next=event_feed["next"])
# To avoid cron jobs from doing the same work at the same time, exit new cron jobs if a cron job is already doing work
last_fetched_timestamp = get_last_fetched(app)
if last_fetched_timestamp and last_fetched_timestamp < (
datetime.now() - CRON_JOB_THRESHOLD_SINCE_LAST_FETCH
):
return

if event_feed["items"]:
for item in event_feed["items"]:
print(json.dumps(item))
api_key: Optional[str] = None
tenant_id: Optional[int] = None
for item in app.service.storage_passwords.list():
if item.content.username == PasswordKeys.API_KEY.value:
api_key = item.clear_password

if item.content.username == PasswordKeys.TENANT_ID.value:
tenant_id = (
int(item.clear_password) if item.clear_password is not None else None
)

if not api_key:
raise Exception("API key not found")

if not tenant_id:
raise Exception("Tenant ID not found")

try:
flare_api = FlareAPI(app=app, api_key=api_key, tenant_id=tenant_id)

next = get_next(app=app, tenant_id=tenant_id)
start_date = get_start_date(app=app)
for response in flare_api.retrieve_feed(next=next, start_date=start_date):
save_last_fetched(app=app)

# Rate limiting.
time.sleep(1)

if response.status_code != 200:
print(response.text, file=sys.stderr)
return

event_feed = response.json()
save_start_date(app=app, tenant_id=tenant_id)
save_next(app=app, tenant_id=tenant_id, next=event_feed["next"])

if event_feed["items"]:
for item in event_feed["items"]:
print(json.dumps(item))
except Exception as e:
print("Exception={}".format(e))


def get_next(app: client.Application, tenant_id: int) -> Optional[str]:
return get_collection_value(
app=app, key="{}{}".format(CollectionKeys.NEXT_TOKEN.value, tenant_id)
)

def get_from_value(app: client.Application) -> Optional[str]:
from_: Optional[str] = None
if KV_COLLECTION_NAME in app.service.kvstore:
data = app.service.kvstore[KV_COLLECTION_NAME].data.query()
if len(data) > 1:
from_ = data[0]["value"]
else:
app.service.kvstore.delete(KV_COLLECTION_NAME)

return from_
def get_start_date(app: client.Application) -> Optional[str]:
return get_collection_value(app=app, key=CollectionKeys.START_DATE.value)


def set_from_value(app: client.Application, next: Optional[str]):
def get_current_tenant_id(app: client.Application) -> Optional[int]:
current_tenant_id = get_collection_value(
app=app, key=CollectionKeys.CURRENT_TENANT_ID.value
)
try:
return int(current_tenant_id) if current_tenant_id else None
except Exception:
pass
return None


def get_last_fetched(app: client.Application) -> Optional[datetime]:
timestamp_last_fetched = get_collection_value(
app=app, key=CollectionKeys.TIMESTAMP_LAST_FETCH.value
)
if timestamp_last_fetched:
try:
return datetime.fromisoformat(timestamp_last_fetched)
except Exception:
pass
return None


def create_collection(app: client.Application) -> None:
if KV_COLLECTION_NAME not in app.service.kvstore:
# Create the collection
app.service.kvstore.create(
name=KV_COLLECTION_NAME, fields={"_key": "string", "value": "string"}
)
# Insert


def save_start_date(app: client.Application, tenant_id: int) -> None:
current_tenant_id = get_current_tenant_id(app=app)
# If this is the first request ever, insert today's date so that future requests will be based on that
if not get_start_date(app):
save_collection_value(
app=app,
key=CollectionKeys.START_DATE.value,
value=datetime.today().isoformat(),
)

# If the current tenant has changed, update the start date so that future requests will be based off today
# If you switch tenants, this will avoid the old tenant from ingesting all the events before today and the day
# that tenant was switched in the first place.
if current_tenant_id != tenant_id:
app.service.kvstore[KV_COLLECTION_NAME].data.update(
id=CollectionKeys.START_DATE.value,
data=json.dumps({"value": date.today().isoformat()}),
)


def save_next(app: client.Application, tenant_id: int, next: Optional[str]) -> None:
# If we have a new next value, update the collection for that tenant to continue searching from that point
if not next:
return

save_collection_value(
app=app,
key="{}{}".format(CollectionKeys.NEXT_TOKEN.value, tenant_id),
value=next,
)


def save_last_fetched(app: client.Application) -> None:
save_collection_value(
app=app,
key=CollectionKeys.TIMESTAMP_LAST_FETCH.value,
value=datetime.now().isoformat(),
)


def get_collection_value(app: client.Application, key: str) -> Optional[str]:
if KV_COLLECTION_NAME in app.service.kvstore:
data = app.service.kvstore[KV_COLLECTION_NAME].data.query()
for entry in data:
if entry["_key"] == key:
return entry["value"]

return None


def save_collection_value(app: client.Application, key: str, value: Any) -> None:
if not get_collection_value(app=app, key=key):
app.service.kvstore[KV_COLLECTION_NAME].data.insert(
json.dumps({"_key": "next", "value": next})
json.dumps(
{
"_key": key,
"value": value,
}
)
)
elif not next:
app.service.kvstore[KV_COLLECTION_NAME].data.delete(id="next")
else:
app.service.kvstore[KV_COLLECTION_NAME].data.update(
id="next", data=json.dumps({"value": next})
id=key,
data=json.dumps({"value": value}),
)


Expand Down
Loading

0 comments on commit 62b27dd

Please sign in to comment.