Skip to content

Commit

Permalink
v1.6.3 (#36)
Browse files Browse the repository at this point in the history
* v1.6.3
  • Loading branch information
MikeSchiessl authored Nov 29, 2022
1 parent 6cd4432 commit 7b99996
Show file tree
Hide file tree
Showing 15 changed files with 141 additions and 64 deletions.
Empty file added 1
Empty file.
6 changes: 3 additions & 3 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM python:3.10.7-slim-bullseye
FROM python:3.11.0-slim-bullseye
LABEL MAINTAINER="Mike Schiessl - mike.schiessl@akamai.com"
LABEL APP_LONG="Akamai Universal Log Streamer"
LABEL APP_SHORT="ULS"
Expand All @@ -10,8 +10,8 @@ ARG HOMEDIR="/opt/akamai-uls"
ARG ULS_DIR="$HOMEDIR/uls"
ARG EXT_DIR="$ULS_DIR/ext"

ARG ETP_CLI_VERSION="0.3.9"
ARG EAA_CLI_VERSION="0.5.1"
ARG ETP_CLI_VERSION="0.4.0"
ARG EAA_CLI_VERSION="0.5.5"
ARG MFA_CLI_VERSION="0.0.9"
ARG GC_CLI_VERSION="dev"
ARG LINODE_CLI_VERSION="dev"
Expand Down
7 changes: 4 additions & 3 deletions bin/config/global_config.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/usr/bin/env python3

# Common global variables / constants
__version__ = "1.6.2"
__version__ = "1.6.3"
__tool_name_long__ = "Akamai Unified Log Streamer"
__tool_name_short__ = "ULS"

Expand Down Expand Up @@ -71,10 +71,11 @@
output_tcp_send_buffer = 262144 # TCP Send buffer in bytes
output_tcp_timeout = 10.0 # TCP SEND / CONNECT Timeout (seconds)
## HTTP
output_http_header = {'User-Agent': f'{__tool_name_long__}/{__version__}'} # HTTP Additional Headers to send (requests module KV pairs)
output_http_header = {'User-Agent': f'{__tool_name_long__}/{__version__}', 'Content-Type': 'application/json'} # HTTP Additional Headers to send (requests module KV pairs)
output_http_timeout = 10 # Timeout after which a request will be considered as failed
output_http_aggregate_count = 50 # Number of events to aggregate in POST request to HTTP Collector. 1 mean no aggregation
output_http_aggregate_count = 500 # Number of events to aggregate in POST request to HTTP Collector. 1 mean no aggregation
output_http_aggregate_idle = 5 # Aggregate will send the data regardless of the count if the previous event was x secs ago
output_http_expected_status_code = 200 # Return Code for successful delivery
## FILE
output_file_encoding = "utf-8" # FILE Encoding setting
output_file_handler_choices = ['SIZE', 'TIME'] # Available Choices for the file handler
Expand Down
9 changes: 9 additions & 0 deletions bin/modules/UlsArgsParser.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,15 @@ def init():
default=(os.environ.get('ULS_HTTP_FORMAT') or '{"event": %s}'),
help='HTTP Message format expected by http receiver '
'(%%s defines the data string). Default \'{\"event\": %%s}\'')
## HTTP AGGREGATE
output_group.add_argument('--httpaggregate',
action='store',
type=int,
default=(os.environ.get('ULS_HTTP_AGGREGATE') or uls_config.output_http_aggregate_count),
help=f"Number of events to aggregate for one output request "
f"the %%s in the httpformat will be replaced by a LIST of events. "
f"Example: %%s = [{{'event1': 'data1'}},{{'event2': 'data2'}},...] - "
f"Default: {uls_config.output_http_aggregate_count}")

# FILE STUFF
## File Handler
Expand Down
2 changes: 1 addition & 1 deletion bin/modules/UlsInputCli.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ def _uls_useragent(self, product, feed):
install_id = UlsTools.get_install_id()['install_id']
if install_id:
header_install_id = f"-{install_id}"
if UlsTools.check_docker():
if UlsTools.check_container():
my_useragent = f'ULS/{uls_config.__version__}_{product}-{feed}{header_install_id}-DKR'
else:
my_useragent = f'ULS/{uls_config.__version__}_{product}-{feed}{header_install_id}'
Expand Down
68 changes: 50 additions & 18 deletions bin/modules/UlsOutput.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,14 @@
import logging
import logging.handlers
import random
import json

# ULS specific modules
import config.global_config as uls_config
import modules.aka_log as aka_log


stopEvent = threading.Event()
#stopEvent = threading.Event()


class UlsOutput:
Expand All @@ -40,6 +41,7 @@ def __init__(self, output_type: str,
host=None,
port=None,
http_out_format=None,
http_out_aggregate_count=None,
http_out_auth_header=None,
http_url=None,
http_insecure=False,
Expand All @@ -49,7 +51,8 @@ def __init__(self, output_type: str,
filemaxbytes=None,
filetime=None,
fileinterval=None,
fileaction=None):
fileaction=None,
stopEvent=None):
"""
Initialzing a new UlsOutput handler
:param output_type: The desired output format (TCP/ UDP / HTTP)
Expand All @@ -68,11 +71,13 @@ def __init__(self, output_type: str,
self.connected = False # Internal Connection tracker - do not touch
# self.output_type = None
self.http_out_format = None
self.http_out_aggregate_count = None
self.http_url = None
self.httpSession = None
self.port = None
self.host = None
self.clientSocket = None
self.stopEvent = stopEvent

# Handover Parameters
## Check & set output type
Expand All @@ -97,6 +102,7 @@ def __init__(self, output_type: str,

# ---- Begin change for EME-588 ----
self.aggregateList = list()
self.http_out_aggregate_count = http_out_aggregate_count # Added for easier CLI configuration
self.aggregateListTick = None # Last time we added items in the list
# ---- End change for EME-588 ----

Expand Down Expand Up @@ -132,8 +138,7 @@ def __init__(self, output_type: str,
self.http_timeout = uls_config.output_http_timeout

elif self.output_type in ['HTTP'] and not http_url:
aka_log.log.critical(f"{self.name} http_out_format http_out_auth_"
f"header http_url or http_insecure missing- exiting")
aka_log.log.critical(f"{self.name} --httpurl missing - exiting")
sys.exit(1)

# File Parameters
Expand Down Expand Up @@ -173,7 +178,7 @@ def connect(self):
"""

reconnect_counter = 1
while not stopEvent.is_set() and self.connected is False \
while not self.stopEvent.is_set() and self.connected is False \
and reconnect_counter <= self.reconnect_retries:
try:

Expand Down Expand Up @@ -302,8 +307,8 @@ def connect(self):
sys.exit(1)

# Due to a inconsitency in python logging handler (https://bugs.python.org/issue46377?) we need to do this
if self.fileinterval.lower() == "midnight":
self.filetime = 1
if self.filetime.lower() == "midnight":
self.fileinterval = 1

file_handler = logging.handlers.TimedRotatingFileHandler(filename=self.filename, when=self.filetime.lower(), interval=self.fileinterval, backupCount=self.filebackupcount, encoding=self.file_encoding, delay=False, utc=uls_config.output_file_default_time_use_utc, atTime=None)

Expand Down Expand Up @@ -372,6 +377,8 @@ def __call__(self, source, dest):
else:
aka_log.log.critical(f"{self.name} not able to connect to {self.http_url} - "
f"giving up after {reconnect_counter - 1} retries.")

self.stopEvent.set()
sys.exit(1)

def send_data(self, data):
Expand All @@ -385,37 +392,62 @@ def send_data(self, data):
aka_log.log.debug(f"{self.name} Trying to send data via {self.output_type}")

if self.output_type == "TCP":
self.clientSocket.sendall(data)
out_data = data + uls_config.output_line_breaker.encode()
self.clientSocket.sendall(out_data)

elif self.output_type == "UDP":
self.clientSocket.sendto(data, (self.host, self.port))
out_data = data + uls_config.output_line_breaker.encode()
self.clientSocket.sendto(out_data, (self.host, self.port))

elif self.output_type == "HTTP":
self.aggregateList.append(data)
if len(self.aggregateList) == uls_config.output_http_aggregate_count or (
if len(self.aggregateList) < self.http_out_aggregate_count:
self.aggregateList.append(json.loads(data.decode()))
else:
aka_log.log.warning(
f"{self.name} HTTP Aggregation queue is already full - not adding any more entries. Size: "
f"({len(self.aggregateList)}/{self.http_out_aggregate_count})")

if len(self.aggregateList) >= self.http_out_aggregate_count or (
self.aggregateListTick is not None and
self.aggregateListTick < time.time() - uls_config.output_http_aggregate_idle
):
data = uls_config.output_line_breaker.join(
self.http_out_format % (event.decode()) for event in self.aggregateList)
request = requests.Request('POST', url=self.http_url, data=data)
request = requests.Request('POST', url=self.http_url, data=(self.http_out_format % json.dumps(self.aggregateList)))
prepped = self.httpSession.prepare_request(request)
payload_length = prepped.headers["Content-Length"]
response = self.httpSession.send(prepped, verify=self.http_verify_tls, timeout=self.http_timeout)
response.close() # Free up the underlying TCP connection in the connection pool

response = None
try:
response = self.httpSession.send(prepped, verify=self.http_verify_tls, timeout=self.http_timeout)
except Exception as bluu:
print(f"bluu {bluu}")
return False
finally:
if response:

response.close() # Free up the underlying TCP connection in the connection pool

aka_log.log.info(f"{self.name} HTTP POST of {len(self.aggregateList)} event(s) "
f"completed in {(response.elapsed.total_seconds()*1000):.3f} ms, "
f"payload={payload_length} bytes, HTTP response {response.status_code}, "
f"response={response.text} ")
if response.status_code != uls_config.output_http_expected_status_code:
return False
self.aggregateList.clear()
else:
aka_log.log.info(f"{self.name} Data not sent, but added to HTTP aggregation. Size: "
f"({len(self.aggregateList)}/{self.http_out_aggregate_count})")
self.aggregateListTick = time.time()
return True
self.aggregateListTick = time.time()

elif self.output_type == "RAW":
sys.stdout.write(data.decode())
out_data = data + uls_config.output_line_breaker.encode()
sys.stdout.write(out_data.decode())
sys.stdout.flush()

elif self.output_type == "FILE":
self.my_file_writer.info(f"{data.decode().rstrip()}")
out_data = data + uls_config.output_line_breaker.encode()
self.my_file_writer.info(f"{out_data.decode().rstrip()}")

else:
aka_log.log.critical(f"{self.name} target was not defined {self.output_type} ")
Expand Down
11 changes: 8 additions & 3 deletions bin/modules/UlsTools.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ def _get_cli_version(cli_bin, edgerc_mock_file):
f"OS Plattform\t\t{platform.platform()}\n"
f"OS Version\t\t{platform.release()}\n"
f"Python Version\t\t{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}\n"
f"Docker Status\t\t{check_docker()}\n"
f"Container Status\t\t{check_container()}\n"
f"RootPath \t\t{root_path}\n"
f"TimeZone (UTC OFST) \t{check_timezone()} ({-time.timezone / 3600})\n"
f"Installation ID \t{get_install_id()['install_id']}"
Expand Down Expand Up @@ -163,8 +163,13 @@ def uls_check_args(input, output):
else:
return 0

def check_docker():
return os.path.isfile('/.dockerenv')

def check_container():
if os.path.isfile('/.dockerenv') or os.path.isfile('/run/.containerenv'):
return True
else:
return False


def check_timezone():
now = datetime.datetime.now()
Expand Down
16 changes: 9 additions & 7 deletions bin/uls.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ def main():
port=uls_args.port,
http_out_format=uls_args.httpformat,
http_out_auth_header=uls_args.httpauthheader,
http_out_aggregate_count=uls_args.httpaggregate,
http_url=uls_args.httpurl,
http_insecure=uls_args.httpinsecure,
filehandler=uls_args.filehandler,
Expand All @@ -124,7 +125,8 @@ def main():
filemaxbytes=uls_args.filemaxbytes,
filetime=uls_args.filetime,
fileinterval=uls_args.fileinterval,
fileaction=uls_args.fileaction)
fileaction=uls_args.fileaction,
stopEvent=stopEvent)


# Load a Transformation (if selected) UlsTransformation
Expand Down Expand Up @@ -187,7 +189,7 @@ def main():
f"({uls_args.transformation}): {log_line}")

# Attach Linebreak
out_data = log_line + uls_config.output_line_breaker.encode()
#out_data = log_line + uls_config.output_line_breaker.encode()

# Send the data (through a loop for retransmission)
resend_counter = 1
Expand All @@ -199,23 +201,23 @@ def main():
f" Delivery (output) attempt "
f"{resend_counter} of {uls_config.main_resend_attempts}")
# Send the data
resend_status = my_output.send_data(out_data)
my_monitor.increase_message_count(len(out_data))
aka_log.log.debug(f"<OUT> {out_data}")
resend_status = my_output.send_data(log_line)
my_monitor.increase_message_count(len(log_line))
aka_log.log.debug(f"<OUT> {log_line}")
resend_counter = resend_counter + 1

if resend_counter == uls_config.main_resend_attempts and\
uls_config.main_resend_exit_on_fail:
aka_log.log.critical(f"MSG[{my_monitor.get_message_count()}] "
f"ULS was not able to deliver the log message "
f"{out_data.decode()} after {resend_counter} attempts - Exiting!")
f"{log_line.decode()} after {resend_counter} attempts - Exiting!")
sys.exit(1)
elif resend_counter == uls_config.main_resend_attempts and \
not uls_config.main_resend_exit_on_fail:
aka_log.log.warning(
f"MSG[{my_monitor.get_message_count()}] "
f"ULS was not able to deliver the log message "
f"{out_data.decode()} after {resend_counter} attempts - (continuing anyway as my config says)")
f"{log_line.decode()} after {resend_counter} attempts - (continuing anyway as my config says)")
except queue.Empty:
# No data available, we get a chance to capture the StopEvent
pass
Expand Down
Loading

0 comments on commit 7b99996

Please sign in to comment.