This repository has been archived on 2022-11-05. You can view files and clone it, but cannot push or open issues or pull requests.
jabberbot/plugins/dsa.py

160 lines
4.5 KiB
Python

#!/usr/bin/python
# -*- coding: utf-8 -*-
# file: dsa.py
# date: 26.07.2020
# desc: Serves debians security alerts
import logging
import urllib3
import threading
from lxml import etree
from manager import Plugin
import common
logging = logging.getLogger()
class Plugin(Plugin):
'''
Fetchs debians security alerts, grabs title and links and sends it to
the muc.
'''
__module = __name__
__command = 'dsa'
__description = 'Serves debians security alerts'
@staticmethod
def get_module():
return Plugin.__module
@staticmethod
def get_command():
return Plugin.__command
@staticmethod
def get_description():
return Plugin.__description
def __init__(self, callback=None):
self.callback = callback
def help(self):
return ('!dsa serves the actual debian security alerts. A given '
' number reduces the count of alerts displayed to number. '
'Not implemented at the moment.'
'\nSyntax: !dsa <number>')
def run(self, stanza):
'''
Starts a thread to grab debians security alerts returns
immediately.
param 1: stanza object
'''
call_msg = 'Call "!help {}"'.format(self.get_command())
no_count_msg = ' '.join(('Not a valid count: "{}"!', call_msg))
count = False
muc_nick = common.get_nick_from_stanza(stanza)
arguments = common.get_arguments_from_body(stanza)
if arguments is not False:
count = self.get_count(arguments[0])
if count is False:
self.callback(': '.join((muc_nick, no_count_msg)))
return
logging.debug('Start thread for debian security alerts')
dsa_thread = DsaThread(self.callback, count)
dsa_thread.run()
logging.debug('DSA Thread started')
def get_count(self, item):
'''
Try to convert a string into integer.
param 1: string
retuns: integer or false
'''
try:
value = int(item.strip())
return value
except Exception as e:
logging.warning('Invalid value for count: {}'.format(item))
logging.warning('Exception: {}'.format(e))
return False
class DsaThread(threading.Thread):
'''
The thread who fetched and returns the wp search.
'''
def __init__(self, callback, count):
threading.Thread.__init__(self)
self.callback = callback
self.count = count
def run(self):
'''
Starts the thread.
'''
dsa_response = self.get_file()
if dsa_response == False:
self.callback('Error while fetching DSA')
else:
status = dsa_response.status
logging.debug("Server returns {}".format(status))
if status != 200:
self.callback('Server returns {}'.format(status))
xmldoc = etree.fromstring(dsa_response.data)
message = self.string_factory(xmldoc)
self.callback(message)
def string_factory(self, xmldoc):
'''
Extracts interested things from the given dsa xml document and
creates a string to post im muc.
param 1: xml object
'''
message = 'Debian Security Alerts:'
nsmap = {
"purl": "http://purl.org/rss/1.0/",
"rdf": "http://www.w3.org/1999/02/22-rdf-syntax-ns#",
}
about_list = xmldoc.xpath('//purl:item/@rdf:about', namespaces=nsmap)
for about in reversed(about_list):
dsa_id = self.get_id_from_about(about)
title = xmldoc.xpath(
'//purl:item[@rdf:about="{}"]/purl:title/text()'.format(
about), namespaces=nsmap)[0]
message = '\n'.join((message, title))
return message
def get_file(self):
'''
Fetchs the security alerts from debian.org
param 1: string
returns: request object or false
'''
url = 'https://www.debian.org/security/dsa-long'
logging.debug('Try to fetch {}'.format(url))
http = urllib3.PoolManager()
try:
dsa_response = http.request('Get', url)
return dsa_response
except:
logging.debug('{}: failed to fetch'.format(url))
return False
def get_id_from_about(self, about):
'''
Extracts the dsa id from tehe given string.
param 1: string
'''
return int(about.split('/')[-1].split('-')[1])