client auf configparser umgestellt

This commit is contained in:
+++ 2020-09-01 18:16:49 +02:00
commit b240de859c
5 changed files with 305 additions and 288 deletions

20
setstatus.conf Normal file
View file

@ -0,0 +1,20 @@
# file: setstatus.conf
# Setstatus.conf is part of doorstatus - a programm to change the krautspaces
# doorstatus. This is the configuration file for the client who triggers the
# change.
[general]
timeout = 3.0
loglevel = info
[server]
host = nr18.space
port = 10001
cert = ./certs/server.crt
fqdn = server.status.kraut.space
[client]
cert = ./certs/client.crt
key = ./certs/client.key

View file

@ -5,85 +5,164 @@
# date: 26.07.2019
# email: berhsi@web.de
# client, that connects to the statusserver at port 10001 to update the
# krautspace door status. allowed arguments are 0 or 1.
# Setstatus.py is part of doorstatus - a programm to deal with the
# krautspaces doorstatus.
# client, who connects to the statusserver at port 10001 to update the
# krautspace door status. If no status is given as argument, he reads from
# stdin until input is 0 or 1.
import argparse
import socket
import ssl
import sys
import os
import socket
import logging
import configparser
from sys import exit, argv
def check_arguments(argv):
'''
Checks length and validity of command line argument vectors. If there is
no argument or argument is not valid, it returns None. Otherwise it
converts the string value into a byte value.
param 1: array of strings
return: None or byte value
'''
if len(argv) == 1:
byte_value = None
else:
if argv[1].strip() == '0' or argv[1].strip() == '1':
i = int(argv[1].strip())
logging.debug('Set value to {}'.format(i))
byte_value = bytes([i])
else:
byte_value = None
return byte_value
def read_argument():
'''
Reads from stdin until the given value is valid. Convert the given
string to a byte value and return this value.
return: byte value
'''
status = None
while status is None:
buf = input('Enter new status (0/1): ')
if buf == '0' or buf == '1':
status = bytes([int(buf)])
logging.debug('Read status: {}'.format(status))
return status
def print_config(config):
'''
Logs the config with level debug.
'''
logging.debug('Using config:')
for section in config.sections():
logging.debug('Section {}'.format(section))
for i in config[section]:
logging.debug(' {}: {}'.format(i, config[section][i]))
def main():
description = "Set door status of Krautspace"
parser = argparse.ArgumentParser(description=description)
parser.add_argument("status_code", help="status to set", type=int,
choices=(0, 1))
args = parser.parse_args()
print("Status set to {}".format(bytes([args.status_code])))
HOST = 'localhost'
PORT = 10001
SERVER_NAME = 'server.status.kraut.space'
CLIENT_CERT = './certs/client.crt'
CLIENT_KEY = './certs/client.key'
SERVER_CERT = './certs/server.crt'
STATUS = None
RESPONSE = None
print('Check certs')
for certfile in (CLIENT_CERT, CLIENT_KEY, SERVER_CERT):
if os.access(certfile, os.R_OK) is False:
print('Failed to read cert: {}'.format(certfile))
sys.exit(1)
try:
context = ssl.create_default_context(ssl.Purpose.SERVER_AUTH,
cafile=SERVER_CERT)
context.load_cert_chain(certfile=CLIENT_CERT, keyfile=CLIENT_KEY)
context.set_ciphers('EECDH+AESGCM') # only ciphers for tls 1.2 and 1.3
context.options |= getattr(ssl._ssl, 'OP_NO_COMPRESSION', 0)
print('SSL context created')
except Exception as e:
print('Failed to create ssl context: {}'.format(e))
sys.exit(2)
loglevel = logging.DEBUG
formatstring = '%(asctime)s: %(levelname)s: %(message)s'
logging.basicConfig(format=formatstring, level=loglevel)
default_config = {
'general': {
'timeout': 5.0,
'loglevel': 'warning'
},
'server': {
'host': 'localhost',
'port': 10001,
'cert': './certs/server.crt',
'fqdn': 'server.status.kraut.space'
},
'client': {
'cert': './certs/client.crt',
'key': './certs/client.key'
}
}
configfile = './setstatus.conf'
config = configparser.ConfigParser()
config.read_dict(default_config)
if not config.read(configfile):
logging.warning('Configuration file {} not found or not readable.'.format(
configfile))
logging.warning('Using default values.')
logger = logging.getLogger()
if not config['general']['loglevel'] in ('critical', 'error', 'warning',
'info', 'debug'):
logging.warning('Invalid loglevel %s given. Using default level %s.',
config['general']['loglevel'],
default_config['general']['loglevel'])
config.set('general', 'loglevel', default_config['general']['loglevel'])
logger.setLevel(config['general']['loglevel'].upper())
print_config(config)
STATUS = check_arguments(argv)
while STATUS is None:
STATUS = read_argument()
context = ssl.create_default_context(ssl.Purpose.SERVER_AUTH,
cafile=config['server']['cert'])
# use only cyphers for tls version 1.2 and 1.3
context.set_ciphers('EECDH+AESGCM')
context.options |= getattr(ssl._ssl, 'OP_NO_COMPRESSION', 0)
context.load_cert_chain(certfile=config['client']['cert'],
keyfile=config['client']['key'])
logging.debug('SSL context created')
with socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) as mySocket:
print('Socket created')
logging.debug('Socket created')
try:
conn = context.wrap_socket(mySocket, server_side=False,
server_hostname=SERVER_NAME)
print('Connection wrapped with ssl.context')
conn.settimeout(5.0)
server_hostname=config['server']['fqdn'])
logging.debug('Connection wrapped with ssl.context')
except Exception as e:
print('Context wrapper failed: {}'.format(e))
logging.error('Context wrapper failed: {}'.format(e))
try:
conn.connect((HOST, PORT))
print('Connection established: {}'.format(conn.getpeercert()))
conn.settimeout(float(config['general']['timeout']))
except Exception as e:
logging.debug('Failed to set timeout: {}'.format(e))
try:
conn.connect((config['server']['host'], int(config['server']['port'])))
except socket.timeout:
print('Connection timeout')
logging.eror('Connection timeout')
except Exception as e:
print('Connection failed: {}'.format(e))
sys.exit(3)
logging.error('Connection failed: {}'.format(e))
exit(1)
logging.debug('Peer certificate commonName: {}'.format(
conn.getpeercert()['subject'][5][0][1]))
logging.debug('Peer certificate serialNumber: {}'.format(
conn.getpeercert()['serialNumber']))
try:
print('Send new status: {}'.format(STATUS))
logging.debug('Send new status: {}'.format(STATUS))
conn.send(STATUS)
except Exception as e:
print('Error: {}'.format(e))
sys.exit(4)
logging.error('Error: {}'.format(e))
exit(2)
try:
RESPONSE = conn.recv(1)
print('Server returns: {}'.format(RESPONSE))
logging.debug('Server returns: {}'.format(RESPONSE))
if RESPONSE == STATUS:
print('Status sucessfull updated')
logging.info('Status sucessfull updated')
else:
print('Failed to update status')
print('Disconnect from server')
logging.error('Failed to update status')
logging.debug('Disconnect from server')
except Exception as e:
print('Error: {}'.format(e))
sys.exit(5)
logging.error('Error: {}'.format(e))
exit(3)
if __name__ == '__main__':

View file

@ -1,27 +1,25 @@
# file: statusd.conf
# Configuration file for the server, who is manage the api for door status
# from krautspace jena.
# Statusd.conf is part of doorstatus - a programm to change the krautspaces
# doorstatus. This is the configuration file for the server, who is manage
# the api for door status from krautspace jena.
# host, where server lives (string with fqdn or ipv4). default ist
# Set [server][host] to localhost or 127.0.0.1 if you want listen only to
# localhost.
HOST = '127.0.0.1'
# port, where the server is listen. default is 100001
PORT = 10001
[general]
timeout = 5.0
loglevel = debug
# timeout for connection
TIMEOUT = 5
[server]
host = localhost
port = 10001
cert = ./certs/server.crt
key = ./certs/server.key
# path for ssl keys and certificates. default is the current directory.
SERVER_CERT = './certs/server.crt'
SERVER_KEY = './certs/server.key'
CLIENT_CERT = './certs/client.crt'
[client]
cert = ./certs/client.crt
# path to api files
API_TEMPLATE = './api_template'
API = './api'
# loglevel (maybe CRITICAL(50), ERROR(40), WARNING(30), INFO(20), DEBUG(10))
# default is warning
VERBOSITY = 'debug'
[api]
api = ./api
template = ./api_template

View file

@ -1,26 +0,0 @@
# file: statusd.conf
# Configuration file for the server, who is manage the api for door status
# from krautspace jena.
# host, where server lives (string with fqdn or ipv4). default ist
# localhost.
HOST = 'localhost'
# port, where the server is listen. default is 100001
PORT = 10001
# timeout for connection
TIMEOUT = 5
# path for ssl keys and certificates. default is the current directory.
SERVER_CERT = './server.crt'
SERVER_KEY = './server.key'
CLIENT_CERT = './client.crt'
# path to api files
API_TEMPLATE = './api_template'
API = '/path/to//api'
# loglevel (maybe ERROR, INFO, DEBUG) - not implementet at the moment.
VERBOSITY = 'info'

View file

@ -4,44 +4,18 @@
# date: 26.07.2019
# email: berhsi@web.de
# server, which listens for ipv4 connections at port 10001. now with ssl
# encrypted connection and client side authentication.
# Status server, listening for door status updates. The IPv4 address and port
# to listen on are configurable, by default localhost:10001 is used. The
# connection is secured by TLS and client side authentication.
import json
import logging
import os
import socket
import ssl
import os
import logging
import json
import sys
from time import time, sleep
from sys import exit
def read_config(CONFIGFILE, CONFIG):
'''
reads the given config file and sets the values are founded.
param 1: string
param 2: dictionary
return: boolean
'''
logging.debug('Read configfile {}'.format(CONFIGFILE))
if os.access(CONFIGFILE, os.R_OK):
logging.debug('Configfile is readable')
with open(CONFIGFILE, 'r') as config:
logging.debug('Configfile successfull read')
for line in config.readlines():
if not line[0] in ('#', ';', '\n', '\r'):
key, value = (line.strip().split('='))
key = strip_argument(key).upper()
if key in CONFIG.keys():
value = strip_argument(value)
CONFIG[key] = value
else:
pass
else:
logging.error('Failed to read {}'.format(CONFIGFILE))
logging.error('Using default values')
return False
return True
import configparser
def certs_readable(config):
@ -51,42 +25,28 @@ def certs_readable(config):
param 1: dictionary
return: boolean
'''
for i in (config['SERVER_KEY'], config['SERVER_CERT'],
config['CLIENT_CERT']):
for i in (config['server']['key'], config['server']['cert'],
config['client']['cert']):
if i == '' or os.access(i, os.R_OK) is False:
logging.error('Cant read {}'.format(i))
logging.error('Cannot read {}'.format(i))
return False
return True
def strip_argument(argument):
def print_config(config):
'''
Becomes a string and strips at first whitespaces, second apostrops and
returns the clear string.
param 1: string
return: string
'''
argument = argument.strip()
argument = argument.strip('"')
argument = argument.strip("'")
return argument
def print_config(CONFIG):
'''
Prints the used configuration, if loglevel ist debug.
param 1: dictionary
return: boolean (allways true)
Logs the config with level debug.
'''
logging.debug('Using config:')
for i in CONFIG.keys():
logging.debug('{}: {}'.format(i, CONFIG[i]))
return True
for section in config.sections():
logging.debug('Section {}'.format(section))
for i in config[section]:
logging.debug(' {}: {}'.format(i, config[section][i]))
def print_ciphers(cipherlist):
'''
This function prints the list of the allowed ciphers.
Prints the list of allowed ciphers.
param1: dictionary
return: boolean
'''
@ -96,12 +56,11 @@ def print_ciphers(cipherlist):
for j in i.keys():
print('{}: {}'.format(j, i[j]))
print('\n')
return True
def display_peercert(cert):
'''
This function displays the values of a given certificate.
Displays the values of a given certificate.
param1: dictionary
return: boolean
'''
@ -112,78 +71,81 @@ def display_peercert(cert):
print('\t{}'.format(j))
else:
print('\t{}'.format(cert[i]))
return True
def receive_buffer_is_valid(raw_data):
'''
checks, if the received buffer from the connection is valid or not.
Checks validity of the received buffer contents.
param 1: byte
return: boolean
'''
if raw_data == b'\x00' or raw_data == b'\x01':
if raw_data in (b'\x00', b'\x01'):
logging.debug('Argument is valid: {}'.format(raw_data))
return True
else:
logging.debug('Argument is not valid: {}'.format(raw_data))
return False
logging.debug('Argument is not valid: {}'.format(raw_data))
return False
def change_status(raw_data, api):
'''
Becomes the received byte and the path to API file. Grabs the content of
the API with read_api() and replaces "open" and "lastchange". Write all
lines back to API file.
Write the new status together with a timestamp into the Space API JSON.
param 1: byte
param 2: string
return: boolean
'''
logging.debug('Change status API')
# todo: use walrus operator := when migrating to python >= 3.8
data = read_api(api)
if data is not False:
status, timestamp = set_values(raw_data)
if os.access(api, os.W_OK):
logging.debug('API file is writable')
with open(api, 'w') as api_file:
logging.debug('API file open successfull')
data["state"]["open"] = status
data["state"]["lastchange"] = timestamp
try:
json.dump(data, api_file, indent=4)
except Exception as e:
logging.error('Failed to change API file')
logging.error('{}'.format(e))
logging.debug('API file changed')
else:
logging.error('API file is not writable. Wrong permissions?')
return False
logging.info('Status successfull changed to {}'.format(status))
return True
return False
if data is False:
return False
status, timestamp = set_values(raw_data)
if os.access(api, os.W_OK):
logging.debug('API file is writable')
with open(api, 'w') as api_file:
logging.debug('API file open successfull')
data["state"]["open"] = status
data["state"]["lastchange"] = timestamp
try:
json.dump(data, api_file, indent=4)
except Exception as e:
logging.error('Failed to change API file')
logging.error('{}'.format(e))
logging.debug('API file changed')
else:
logging.error('API file is not writable. Wrong permissions?')
return False
logging.info('Status successfull changed to {}'.format(status))
return True
def read_api(api):
'''
Reads the API file in an buffer und returns the buffer. If anything goes
wrong, it returns False - otherwise it returns the buffer.
Reads the Space API JSON into a dict. Returns the dict on success and
False on failure.
param 1: string
return: string or boolean
return: dict or boolean
'''
logging.debug('Open API file: {}'.format(api))
if os.access(api, os.R_OK):
logging.debug('API is readable')
with open(api, 'r') as api_file:
logging.debug('API opened successfull')
try:
api_json_data = json.load(api_file)
logging.debug('API file read successfull')
except Exception as e:
logging.error('Failed to read API file(): {}'.format(e))
return False
return (api_json_data)
logging.error('Failed to read API file')
return False
# return early if the API JSON cannot be read
if not os.access(api, os.R_OK):
logging.error('Failed to read API file')
return False
logging.debug('API is readable')
with open(api, 'r') as api_file:
logging.debug('API file successfully opened')
try:
api_json_data = json.load(api_file)
logging.debug('API file read successfull')
except Exception as e:
logging.error('Failed to read API file: {}'.format(e))
return False
return api_json_data
def set_values(raw_data):
@ -193,45 +155,21 @@ def set_values(raw_data):
param 1: byte
return: tuple
'''
status = "true" if raw_data == b'\x01' else "false"
timestamp = str(time()).split('.')[0]
if raw_data == b'\x01':
status = "true"
else:
status = "false"
logging.debug('Set values for timestamp: {} and status: {}'.format(
timestamp, status))
return (status, timestamp)
def read_loglevel(CONFIG):
'''
The function translates the value string from config verbosity option to
a valid logging option.
param1: dictionary
return: boolean or integer
'''
if CONFIG['VERBOSITY'] == 'critical':
loglevel = logging.CRITICAL
elif CONFIG['VERBOSITY'] == 'error':
loglevel = logging.ERROR
elif CONFIG['VERBOSITY'] == 'warning':
loglevel = logging.WARNING
elif CONFIG['VERBOSITY'] == 'info':
loglevel = logging.INFO
elif CONFIG['VERBOSITY'] == 'debug':
loglevel = logging.DEBUG
else:
loglevel = False
return(loglevel)
def main():
'''
The main function - opens a socket, create a ssl context, load certs and
listen for connections. at ssl context we set only one available cipher
The main function - open a socket, create a ssl context, load certs and
listen for connections. At SSL context we set only one available cipher
suite and disable compression.
OP_NO_COMPRESSION: prevention against crime attack
OP_DONT_ISERT_EMPTY_FRAGMENTS: prevention agains cbc 4 attack
OP_NO_COMPRESSION: prevention against CRIME attack
OP_DONT_ISERT_EMPTY_FRAGMENTS: prevention agains CBC 4 attack
(cve-2011-3389)
'''
@ -239,90 +177,99 @@ def main():
formatstring = '%(asctime)s: %(levelname)s: %(message)s'
logging.basicConfig(format=formatstring, level=loglevel)
CONFIG = {
'HOST': 'localhost',
'PORT': 10001,
'SERVER_CERT': './server.crt',
'SERVER_KEY': './server.key',
'CLIENT_CERT': './client.crt',
'TIMEOUT': 3.0,
'API': './api',
'API_TEMPLATE': './api_template',
'VERBOSITY': 'warning'
default_config = {
'general': {
'timeout': 3.0,
'loglevel': 'warning'
},
'server': {
'host': 'localhost',
'port': 10001,
'cert': './certs/server.crt',
'key': './certs/server.key'
},
'client': {
'cert': './certs/client.crt'
},
'api': {
'api': './api',
'template': './api_template'
}
CONFIG_FILE = './statusd.conf'
read_config(CONFIG_FILE, CONFIG)
loglevel = read_loglevel(CONFIG)
if loglevel is not False:
logger = logging.getLogger()
logger.setLevel(loglevel)
else:
loglevel = logging.WARNING
logger = logging.getLogger()
logger.setLevel(loglevel)
logging.warning('Invalid value for loglevel. Set default value')
}
configfile = './statusd.conf'
config = configparser.ConfigParser()
config.read_dict(default_config)
if not config.read(configfile):
logging.warning('Configuration file %s not found or not readable. Using default values.',
configfile)
print_config(CONFIG)
logger = logging.getLogger()
if not config['general']['loglevel'] in ('critical', 'error', 'warning', 'info', 'debug'):
logging.warning('Invalid loglevel %s given. Using default level %s.',
config['general']['loglevel'],
default_config['general']['loglevel'])
config.set('general', 'loglevel', default_config['general']['loglevel'])
logger.setLevel(config['general']['loglevel'].upper())
print_config(config)
# todo: zertifikate sollten nur lesbar sein!
if certs_readable(CONFIG) is False:
if not certs_readable(config):
logging.error('Cert check failed\nExit')
exit()
sys.exit(1)
context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
context.verify_mode = ssl.CERT_REQUIRED
context.load_cert_chain(certfile=CONFIG['SERVER_CERT'],
keyfile=CONFIG['SERVER_KEY'])
context.load_verify_locations(cafile=CONFIG['CLIENT_CERT'])
context.load_cert_chain(certfile=config['server']['cert'],
keyfile=config['server']['key'])
context.load_verify_locations(cafile=config['client']['cert'])
context.set_ciphers('EECDH+AESGCM') # only ciphers for tls 1.2 and 1.3
context.options = ssl.OP_CIPHER_SERVER_PREFERENCE
# ssl + kompression = schlecht
context.options |= getattr(ssl._ssl, 'OP_NO_COMPRESSION', 0)
# ensure, compression is disabled (disabled by default anyway at the moment)
context.options |= ssl.OP_NO_COMPRESSION
logging.debug('SSL context created')
# print_ciphers(context.get_ciphers())
with socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) as mySocket:
logging.debug('Socket created')
try:
mySocket.bind((CONFIG['HOST'], int(CONFIG['PORT'])))
mySocket.bind((config['server']['host'], int(config['server']['port'])))
mySocket.listen(5)
logging.info('Listen on {} at Port {}'.format(CONFIG['HOST'],
CONFIG['PORT']))
except Exception as e:
logging.error('unable to bind and listen')
logging.error('Unable to bind and listen')
logging.error('{}'.format(e))
exit()
sys.exit(1)
logging.info('Listening on {} at Port {}'.format(config['server']['host'],
config['server']['port']))
while True:
try:
fromSocket, fromAddr = mySocket.accept()
logging.info('Client connected: {}:{}'.format(fromAddr[0],
fromAddr[1]))
logging.info('Client connected: {}:{}'.format(fromAddr[0], fromAddr[1]))
try:
fromSocket.settimeout(float(CONFIG['TIMEOUT']))
fromSocket.settimeout(float(config['general']['timeout']))
logging.debug('Connection timeout set to {}'.format(
CONFIG['TIMEOUT']))
config['general']['timeout']))
except Exception:
logging.error('Canot set timeout to {}'.format(
CONFIG['TIMEOUT']))
logging.error('Use default value: 3.0')
fromSocket.settimeout(3.0)
logging.error('Cannot set timeout to {}'.format(
config['general']['timeout']))
try:
conn = context.wrap_socket(fromSocket, server_side=True)
conn.settimeout(3.0)
# display_peercert(conn.getpeercert())
logging.debug('Connection established')
logging.debug('Peer certificate commonName: {}'.format
(conn.getpeercert()['subject'][5][0][1]))
logging.debug('Peer certificate serialNumber: {}'.format
(conn.getpeercert()['serialNumber']))
conn.settimeout(float(config['general']['timeout']))
except socket.timeout:
logging.error('Socket timeout')
except Exception as e:
logging.error('Connection failed: {}'.format(e))
logging.info('Connection established')
logging.info('Peer certificate commonName: {}'.format(
conn.getpeercert()['subject'][5][0][1]))
logging.debug('Peer certificate serialNumber: {}'.format(
conn.getpeercert()['serialNumber']))
raw_data = conn.recv(1)
if receive_buffer_is_valid(raw_data) is True:
if change_status(raw_data, CONFIG['API']) is True:
if change_status(raw_data, config['api']['api']) is True:
logging.debug('Send {} back'.format(raw_data))
conn.send(raw_data)
# change_status returns false:
@ -332,15 +279,14 @@ def main():
conn.send(b'\x03')
# receive_handle returns false:
else:
logging.info('Invalid argument recived: {}'.format(
raw_data))
logging.info('Invalid argument received: {}'.format(raw_data))
logging.debug('Send {} back'.format(b'\x03'))
if conn:
conn.send(b'\x03')
sleep(0.1) # protection against dos
except KeyboardInterrupt:
logging.info('Exit')
exit()
sys.exit(1)
except Exception as e:
logging.error('{}'.format(e))
continue