86 lines
3.0 KiB
Python
86 lines
3.0 KiB
Python
import os
|
|
import time
|
|
import urllib3
|
|
from elasticsearch import Elasticsearch, helpers
|
|
from .es_auth import resolve_api_key
|
|
from .logging_config import setup_logging
|
|
|
|
# Suppress insecure request warnings if SSL verification is disabled
|
|
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
|
|
|
|
logger = setup_logging("es_client")
|
|
|
|
class ESClient:
|
|
def __init__(self):
|
|
self.url = os.getenv("ES_URL", "http://localhost:9200")
|
|
env_api_id = os.getenv("ES_API_ID")
|
|
env_api_key = os.getenv("ES_API_KEY")
|
|
self.api_id, self.api_key = resolve_api_key(env_api_id, env_api_key)
|
|
self.user = os.getenv("ES_USER", "elastic")
|
|
self.password = os.getenv("ES_PASS", "changeme")
|
|
self.verify_ssl = os.getenv("ES_VERIFY_SSL", "true").lower() == "true"
|
|
|
|
if self.api_id and self.api_key:
|
|
# Use API key authentication
|
|
self.client = Elasticsearch(
|
|
self.url,
|
|
api_key=(self.api_id, self.api_key),
|
|
verify_certs=self.verify_ssl,
|
|
ssl_show_warn=False
|
|
)
|
|
logger.info("Using Elasticsearch API key authentication.")
|
|
else:
|
|
# Fallback to basic auth
|
|
self.client = Elasticsearch(
|
|
self.url,
|
|
basic_auth=(self.user, self.password),
|
|
verify_certs=self.verify_ssl,
|
|
ssl_show_warn=False
|
|
)
|
|
logger.info("Using Elasticsearch basic authentication.")
|
|
|
|
def check_connection(self):
|
|
try:
|
|
return self.client.info()
|
|
except Exception as e:
|
|
logger.error(f"Failed to connect to Elasticsearch: {e}")
|
|
raise
|
|
|
|
def bulk_index(self, actions):
|
|
"""
|
|
Bulk index a list of actions.
|
|
actions: list of dicts compatible with elasticsearch.helpers.bulk
|
|
"""
|
|
if not actions:
|
|
return 0, []
|
|
|
|
try:
|
|
success, failed = helpers.bulk(self.client, actions, stats_only=False, raise_on_error=False)
|
|
if failed:
|
|
logger.warning(f"Bulk index had failures: {len(failed)} items failed.")
|
|
for item in failed[:5]: # Log first 5 failures
|
|
logger.warning(f"Failure sample: {item}")
|
|
else:
|
|
logger.info(f"Bulk index successful: {success} items.")
|
|
return success, failed
|
|
except Exception as e:
|
|
logger.error(f"Bulk index exception: {e}")
|
|
raise
|
|
|
|
def search_hosts(self, index="network-hosts", query=None, size=1000):
|
|
"""
|
|
Search for hosts in network-hosts index.
|
|
"""
|
|
if query is None:
|
|
query = {"match_all": {}}
|
|
|
|
try:
|
|
resp = self.client.search(index=index, query=query, size=size)
|
|
return [hit["_source"] for hit in resp["hits"]["hits"]]
|
|
except Exception as e:
|
|
logger.error(f"Search failed: {e}")
|
|
return []
|
|
|
|
def get_es_client():
|
|
return ESClient()
|