Source code for loki_api_client.loki_connect

"""
A Class for collection of logs from a Loki host.

References:
    - https://requests.readthedocs.io/
    - https://grafana.com/docs/loki/latest/api/
"""

import logging
from urllib.parse import urlparse

import requests
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry

__author__ = "Anand Sanmukhani"
__copyright__ = "Anand Sanmukhani"
__license__ = "MIT"

# set up logging
_logger = logging.getLogger(__name__)

# In case of a connection failure try 2 more times
MAX_REQUEST_RETRIES = 3
# wait 1 second before retrying in case of an error
RETRY_BACKOFF_FACTOR = 1
# retry only on these status
RETRY_ON_STATUS = [408, 429, 500, 502, 503, 504]


[docs]class LokiConnect: """ A Class for collection of metrics from a Loki Host. Args: url (str): url for the loki host headers (dict): A dictionary of http headers to be used to communicate with the host. Example: {"Authorization": "bearer my_oauth_token_to_the_host"} disable_ssl (bool): If set to True, will disable ssl certificate verification for the http requests made to the host retry (Retry): Retry adapter to retry on HTTP errors """ def __init__( self, url: str = "http://127.0.0.1:3100", headers: dict = None, disable_ssl: bool = False, retry: Retry = None, ignore_http_errors: bool = False, ): """Functions as a Constructor for the class LokiConnect.""" if url is None: raise TypeError("missing url") self.headers = headers self.url = url self.loki_host = urlparse(self.url).netloc self._all_metrics = None self.ssl_verification = not disable_ssl self.ignore_http_errors = ignore_http_errors if retry is None: retry = Retry( total=MAX_REQUEST_RETRIES, backoff_factor=RETRY_BACKOFF_FACTOR, status_forcelist=RETRY_ON_STATUS, ) self._session = requests.Session() self._session.mount(self.url, HTTPAdapter(max_retries=retry))
[docs] def ready(self, params: dict = None) -> bool: """ Check if Loki host is ready to accept traffic. Ref: https://grafana.com/docs/loki/latest/api/#get-ready Args: params (dict): Optional dictionary containing parameters to be sent along with the API request. Returns: bool: True if the host is ready, False if the endpoint is not ready. """ params = params or {} response = self._session.get( "{0}/ready".format(self.url), verify=self.ssl_verification, headers=self.headers, params=params, ) return response.ok
[docs] def query( self, query: str, limit: int = 10, time: str = None, direction: str = "backward", params: dict = None, ) -> dict: """ Check if Loki host is ready to accept traffic. Ref: https://grafana.com/docs/loki/latest/api/#query Args: query (str): The LogQL query to perform. limit (int): The max number of entries to return. time (str): The evaluation time for the query as a nanosecond Unix epoch. Defaults to now. direction (str): Determines the sort order of logs. Supported values are "forward" or "backward". Defaults to "backward". params (dict): Optional dictionary containing parameters to be sent along with the API request. Returns: dict: A json of queried log data. """ params = params or {} if query: if not isinstance(query, str): raise TypeError( "Incorrect query type: {}, should be type: {}".format( type(query), str ) ) params["query"] = query else: raise ValueError("query empty") if time: params["time"] = time if limit: params["limit"] = limit direction_supported_values = ["backward", "forward"] if direction not in direction_supported_values: raise ValueError("Invalid direction Value: {}".format(direction)) params["direction"] = direction response = self._session.get( "{0}/loki/api/v1/query".format(self.url), verify=self.ssl_verification, headers=self.headers, params=params, ) if not self.ignore_http_errors: response.raise_for_status() return response.json()