How to send syslog to Splunk HTTP Event Collector

Jakub Jóźwicki
2 min readOct 8, 2021

We need to create a /etc/rsyslog.d/rule with external program.

module(load=”imfile”)
module(load=”omprog”)
input(type=”imfile”
File=”/opt/app/app.log”
Tag=”app”
Facility=”user”
Severity=”info”
PersistStateInterval=”1"
reopenOnTruncate=”on”
freshStartTail=”on”
ruleset=”app-log”)
ruleset(name=”app-log”) {
action(type=”omprog”
binary=”/usr/local/bin/omsplunkhec.py 012345678–0123–4567–8901–202110050304 central-splunk --port 8080 --source=app-node — sourcetype=syslog — index=main — maxThreads=4"
template=”RSYSLOG_TraditionalFileFormat”)
stop
}
---
#! /usr/bin/python3
""" © 2021 Jakub Jozwicki, © 2016 Ryan Faircloth, © 2014 Adiscon GmbH, Apache License, Version 2.0"""
# System modules
import os
import sys
import queue
import threading
import time
import argparse
import select
import requests
import urllib
import json
import time
import socket
import uuid
import logging
_LOG_LEVEL_STRINGS = [‘CRITICAL’, ‘ERROR’, ‘WARNING’, ‘INFO’, ‘DEBUG’]def _log_level_string_to_int(log_level_string):
if not log_level_string in _LOG_LEVEL_STRINGS:
message = ‘invalid choice: {0} (choose from {1})’.format(log_level_string, _LOG_LEVEL_STRINGS)
raise argparse.ArgumentTypeError(message)
log_level_int = getattr(logging, log_level_string, logging.INFO)
# check the logging log_level_choices have not changed from our expected values
assert isinstance(log_level_int, int)
return log_level_int
# skeleton config parameterspollPeriod = 0.2 # the number of seconds between polling for new messageshost=socket.gethostname()parser = argparse.ArgumentParser()parser.add_argument(“token”, help=”http event collector token”)
parser.add_argument(“server”, help=”http event collector fqdn”)
parser.add_argument(‘--port’, help=”port”,default=’8088')
parser.add_argument(‘--ssl’, help=”use ssl”,action=’store_true’,default=False)
parser.add_argument(‘--ssl_noverify’, help=”disable ssl validation”,action=’store_false’,default=True)
parser.add_argument(‘--source’,default=”hec:syslog:” + host)
parser.add_argument(‘--sourcetype’,default=”syslog”)
parser.add_argument(‘--index’,default=”main”)
parser.add_argument(‘--host’,default=host)
parser.add_argument(‘--maxBatch’,help=”max number of records allowed in one batch of requests for hec”,default=10,type=int)
parser.add_argument(‘--maxQueue’,help=”max number of records to be read from rsyslog queue for transfer”,default=5000,type=int)
parser.add_argument(‘--maxThreads’,help=”max number of threads for work”,default=10,type=int)
parser.add_argument(‘--nopost’, help=”don’t post, for debug reason”,action=’store_false’)
parser.add_argument(‘--log_dir’,help=”log directory”,default=”/var/log”,type=str )
parser.add_argument(‘--log-level’, default=’CRITICAL’,
dest=’log_level’, type=_log_level_string_to_int, nargs=’?’,
help=’Set the logging output level. {0}’.format(_LOG_LEVEL_STRINGS))
args = parser.parse_args()logger = logging.getLogger(‘omsplunkhec’)
logger.setLevel(args.log_level)
fi = logging.FileHandler(os.path.join(args.log_dir,’omsplunkhec.log’))
fi.setLevel(args.log_level)
formatter = logging.Formatter(‘%(asctime)s — %(name)s — %(levelname)s — %(message)s’)
fi.setFormatter(formatter)
logger.addHandler(fi)
def onReceive(threadID, mq, tq, evt):
with requests.Session() as s:
headers = {‘Authorization’:’Splunk ‘+args.token}
session_id = str(uuid.uuid1())
if args.ssl:
protocol = ‘https’
else:
protocol = ‘http’
querystring = urllib.parse.urlencode({‘channel’ : session_id, ‘source’ : args.source, ‘sourcetype’ : args.sourcetype, ‘index’ : args.index, ‘host’: args.host })
server_uri = ‘%s://%s:%s/services/collector/raw?%s’ % (protocol, args.server, args.port, querystring)
logger.info(‘TheadID %s’ % (threadID))
logger.info(‘server_uri= %s’ % (server_uri))
while not evt.is_set() or not mq.empty():
c=0
data = []
sl=0
while sl<2 and c < args.maxBatch:
try:
m = mq.get(True, 1)
c = c+1
data.append(m)
mq.task_done()
except queue.Empty:
sl=sl+1
logger.debug(“empty queue, sleeping”)
time.sleep(3)
if c>0:
d = “”.join(data)
r = s.post(server_uri,data=d,headers=headers, verify=args.ssl_noverify)
if r.status_code!=200:
logger.warning(“r.status_code=%s” % (r.status_code))
logger.warning(“r=%s” % (r.text))
else:
logger.debug(“nothing to send”)
z = tq.get()
tq.task_done()
stop_event = threading.Event()
maxAtOnce = args.maxBatch
msgQueue=queue.Queue(maxsize=args.maxQueue)
threadQueue = queue.Queue(maxsize=args.maxThreads)
for i in range(args.maxThreads):
threadQueue.put(i)
worker = threading.Thread(target=onReceive, args=(i, msgQueue, threadQueue, stop_event))
worker.setDaemon(True)
worker.start()
while not stop_event.is_set():
while not stop_event.is_set() and sys.stdin in select.select([sys.stdin], [], [], pollPeriod)[0]:
line = sys.stdin.readline()
if line:
msgQueue.put(line)
else: # an empty line means stdin has been closed
logger.info(‘end of stdin’)
stop_event.set()
msgQueue.join()
logger.info(‘msgQueue joined’)
logger.info(‘waiting for thread shutdown’)
threadQueue.join()
sys.stdout.flush() # very important, Python buffers far too much! https://github.com/rsyslog/rsyslog/issues/22

--

--