Arbeit mit unterschiedlichen Datenformaten: XML, JSON, CSV, XSL(X)¶
XML¶
Abfrage Titel und Beschreibung von RSS-Feeds der TUC
import urllib.request, urllib.parse, re, sys
import feedparser
import speedparser3
from xml.dom.minidom import parseString
from lxml import etree
parser = sys.argv[1] if len(sys.argv) > 1 else 's'
for url in ('https://www.tu-chemnitz.de/uk/pressestelle/aktuell/aktuell-full.rss',
'https://blog.hrz.tu-chemnitz.de/bibo/feed/',
'https://blog.hrz.tu-chemnitz.de/urzcommunity/feed/',
'https://blog.hrz.tu-chemnitz.de/botschafter/feed/',
'https://blog.hrz.tu-chemnitz.de/bau/feed/',
'https://www.tu-chemnitz.de/tu/termine/tu-termine.rss'):
if parser == 'f':
p = feedparser.parse(url)
print(p.feed['title'])
print(p.feed['description'])
elif parser == 's':
#p = speedparser3.parse(urllib.request.urlopen(url).read(), clean_html = False)
p = speedparser3.parse(urllib.request.urlopen(url).read())
print(p.feed['title'])
print(p.feed['description'])
elif parser == 'm':
doc = parseString(urllib.request.urlopen(url).read())
print(doc.getElementsByTagName('title')[0].firstChild.data)
print(doc.getElementsByTagName('description')[0].firstChild.data)
elif parser == 'l':
tree = etree.parse(urllib.request.urlopen(url))
print(tree.xpath('//title')[0].text)
print(tree.xpath('//description')[0].text)
print('---')
# ggf. fehlertoleranten Parser erstellen
# https://stackoverflow.com/questions/13046240/parseerror-not-well-formed-invalid-token-using-celementtree/20204635
# XML-Dokument einlesen und parsen
tree = etree.parse(urllib.request.urlopen(url), parser = etree.XMLParser(recover = True))
print(tree.xpath('//title')[0].text)
print(tree.xpath('//description')[0].text)
Parsen von invalidem XML
from lxml import etree
from io import StringIO
# broken xml
xml = '''
<foo start="yes">
111
<bar>
aaa
</foo>
'''
if 0:
tree = etree.parse(StringIO(xml), parser = etree.XMLParser(recover = True))
else:
tree = etree.fromstring(xml, parser = etree.XMLParser(recover = True))
for elem in tree.getiterator():
print(elem.tag)
print(' ATTR:', elem.attrib)
print(' TEXT:', repr(elem.text))
Suche im LDAP der TU Chemnitz
from ldap3 import Connection, SYNC
from ldap3.core.exceptions import LDAPSizeLimitExceededResult
# http://ldap3.readthedocs.io/welcome.html
# http://ldap3.readthedocs.io/tutorial_searches.html
#
# https://www.tu-chemnitz.de/urz/idm/services/ldap.html
#
# LDAP-Server: ldap.tu-chemnitz.de
# Port: 389
# Namens-Wurzel / Basis-DN: ou=Users,dc=tu-chemnitz,dc=de
# Suchfilter (optional): (mail=*)
connection_kwargs = {
'server': 'ldap.tu-chemnitz.de',
'auto_bind': True,
#'client_strategy': SYNC,
}
conn = Connection(**connection_kwargs)
print(conn)
searchParameters = {'search_base': 'ou=Users,dc=tu-chemnitz,dc=de',
#'search_filter': '(& (objectClass=Person) (sn=Müller*))',
#'search_filter': '(& (objectClass=Person) (sn=Trapp*))',
'search_filter': '(mail=*berger*)',
'attributes': ['cn', 'givenName'],
'paged_size': 5 }
# paged search mit Funktion "paged_search()"
cnt = 0
entries = conn.extend.standard.paged_search(**searchParameters)
try:
for entry in entries:
print(entry)
cnt += 1
except LDAPSizeLimitExceededResult:
print('LDAPSizeLimitExceededResult')
print(cnt)
print('============')
# paged search mit eigenem Cookie-Handling
while True:
# eine Page abfragen
conn.search(**searchParameters)
# gefundene Einträge ausgeben
for entry in conn.entries:
print(entry.entry_to_json())
print(entry)
# ggf. Übergang zur nächsten Page
try:
# print(conn.result['controls'])
# {'1.2.840.113556.1.4.319': {'description': ('1.2.840.113556.1.4.319',
# 'CONTROL', 'LDAP Simple Paged Results', 'RFC2696'), 'criticality': False,
# 'value': {'size': 0, 'cookie': b'm\x00\x00\x00\x00\x00\x00\x00'}}}
cookie = conn.result['controls']['1.2.840.113556.1.4.319']['value']['cookie']
if cookie:
searchParameters['paged_cookie'] = cookie
print('next page')
else:
# wir haben alle Daten vollständig geholt, daher existiert kein
# Cookie mehr
print('no cookie')
break
except Exception as e:
# der Key 'controls' existiert nicht, wenn die Maximale Entry-Zahl (bei
# uns 100) abgerufen wurde
print(repr(e))
break
JSON¶
Pretty Printer für JSON
#!/bin/env python3
#
# JSON-File "schön" formatiert ausgeben
#
# 8.6.2017
import sys, json
# das zu lesende JSON-File bestimmen
if 0:
# Variante 1: mit vollständiger Alternative
if len(sys.argv) == 1:
# dem vorliegenden Skript wurden keine Kommandozeilenargumente übergeben
json_file = 'services.json'
else:
# wir haben mindestens 1 Kommandozeilenargument und nutzen es als Name der
# JSON-Datei
json_file = sys.argv[1]
else:
# Variante 2: mit ternärem Operator
json_file = 'services.json' if len(sys.argv) == 1 else sys.argv[1]
# die JSON-Daten in ein Dictionary einlesen und schön formatiert wieder ausgeben
print(json.dumps(json.load(open(json_file)), indent = 2))
CSV und XSLX¶
Erzeugung von CSV und XSLX
#!/usr/bin/env python3
#
# Daten in eine CSV- und eine XSLX-Datei ausgeben
#
# siehe auch
#
# http://xlsxwriter.readthedocs.io/
#
# 22.1.2018
import csv, xlsxwriter
from pprint import pprint
# die auszuwertetenden CSV-Zeilen
csv_lines = '''
ID,Name,Vorname,Alter,Gehalt,Prämien
123,Müller,Max,64,1721.56,355.2
758,Ast,Irene,33,2314.88,841.7
999,Becher,Stella,21,1588.64,0.0
'''
# Ausgabe der Daten in ein Excel-File
def write_excel(data, xslx_filename):
# Create a workbook and add a worksheet.
workbook = xlsxwriter.Workbook(xslx_filename, {'strings_to_urls': False})
worksheet = workbook.add_worksheet()
# Add an Excel date format.
float_format = workbook.add_format({'num_format': '0.00'})
int_format = workbook.add_format({'num_format': '######'})
# Add a bold format to use to highlight cells.
bold = workbook.add_format({'bold': 1})
# Start from the first cell. Rows and columns are zero indexed.
row = 0
# Spaltenbreiten definieren
# 1. Spalte: 8
# 2. Spalte: 30
# 3. Spalte: 30
# 4. Spalte: 5
# 5. Spalte: 10
breite = 8, 30, 30, 5, 10, 10
# Spaltenüberschriften ausgeben
for col, text in enumerate(data[0]):
#text = text.encode('utf8')
worksheet.write(row, col, text, bold)
worksheet.set_column(col, col, breite[col])
cell_debug = open(xslx_filename + '.debug', 'w')
# Daten ausgeben
for line in data[1:]:
row += 1
for col, item in enumerate(line):
cell_data = item
# Zellenformat bestimmen:
# - float_format bei float
# - int_format bei int
# - sonst None
cell_format = float_format if isinstance(cell_data, float) else int_format if isinstance(cell_data, int) else None
if cell_data == 0.0:
# die float-Zahl 0.0 durch den Integer 0 ersetzen und damit im
# Excel-Sheet unterdrücken
cell_data = 0
cell_format = int_format
# Zelleninhalt ausgegeben
worksheet.write(row, col, cell_data, cell_format)
print('%s|%s|%r' % (row, col, cell_data), file = cell_debug)
# Excel-File schließen
workbook.close()
if __name__ == '__main__':
# die CSV-Zeilen mit einem csv.reader einlesen
lines = csv_lines.strip('\n').splitlines()
data = tuple(csv.reader(lines))
for line in data[1:]:
# ab Zeile 2 die Integer- und Float-Strings nach int/float konvertieren
for i in 0, 3:
line[i] = int(line[i])
for i in 4, 5:
line[i] = float(line[i])
# CSV-File ausgeben
with open('mitarbeiter.csv', 'w') as csv_file:
csv.writer(csv_file).writerows(data)
# auch ein DictReader und DictWriter ist nutzbar
feldnamen = data[0]
data_list = list(csv.DictReader(lines, feldnamen))
pprint(data_list)
# Ausgabe via DictWriter
with open('mitarbeiter2.csv', 'w') as csv_file:
csv.DictWriter(csv_file, fieldnames = feldnamen).writerows(data_list)
# ein Generator, der die Spaltenüberschriften groß schreibt und den Rest
# der Daten aus der data_list ab Zeile 2 (die als Sub-Generator dient) holt
def data_gen():
yield dict((name, name.upper()) for name in feldnamen)
yield from data_list[1:]
# alternative Ausgabe über DictWriter unter Nutzung des Generators
with open('mitarbeiter3.csv', 'w') as csv_file:
csv.DictWriter(csv_file, fieldnames = feldnamen).writerows(data_gen())
# Excel-File ausgeben
write_excel(data, 'mitarbeiter.xlsx')
PETL - CSV parsen und generieren
# http://petl.readthedocs.io/en/latest/intro.html
import petl as etl
example_data = """foo,bar,baz
a,1,3.4
b,2,7.4
c,6,2.2
d,9,8.1
"""
with open('example.csv', 'w') as f:
f.write(example_data)
#table1 = etl.fromcsv('example.csv', encoding = "utf-8")
table1 = etl.fromcsv('example.csv')
table2 = etl.convert(table1, 'foo', 'upper')
table3 = etl.convert(table2, 'bar', int)
table4 = etl.convert(table3, 'baz', float)
table5 = etl.addfield(table4, 'quux', lambda row: row.bar * row.baz)
print(etl.look(table5))
table = (
etl
.fromcsv('example.csv')
.convert('foo', 'upper')
.convert('bar', int)
.convert('baz', float)
.addfield('quux', lambda row: row.bar + row.baz)
)
print(table.look())
# 2 Spalten konvertieren
table = table.convert('foo', 'replace', 'A', '*A*').convert('bar', bool)
print(table.look())
# Spalte bar löschen
table = table.cutout('bar')
print(table.look())
# Spalte foo mittels eigener Funktion konvertieren
table = table.convert('foo', lambda t: 2 * t)
print(table.look())
# aus einem Container eine Table erstellen
l = [['foo', 'bar'], ['a', 1], ['b', 2], ['c,C', 2.9]]
table = etl.wrap(l)
print(table.look())
print(table.tocsv())
PETL - RSS-Feeds parsen
import petl as etl
for url in ('https://www.tu-chemnitz.de/uk/pressestelle/aktuell/aktuell-full.rss',
'https://blog.hrz.tu-chemnitz.de/bibo/feed/',
'https://blog.hrz.tu-chemnitz.de/urzcommunity/feed/',
'https://blog.hrz.tu-chemnitz.de/botschafter/feed/',
'https://blog.hrz.tu-chemnitz.de/bau/feed/',
'https://www.tu-chemnitz.de/tu/termine/tu-termine.rss'):
table = etl.fromxml(url, 'channel', 'title')
#print(table.look(style = 'minimal'))
print(next(iter(etl.columns(table))))
table = etl.fromxml(url, 'channel', 'description')
print(next(iter(etl.columns(table))))
print()
PETL - Abfrage einer MariaDB-Tabelle der Uni.Bibliografie
import petl as etl
import MySQLdb
connection = MySQLdb.connect(host = 'opac.bibliothek.tu-chemnitz.de', user = 'opusread', passwd = 'opusread', db = 'unibiblio')
#mkcursor = lambda: connection.cursor()
table = etl.fromdb(mkcursor if 0 else connection, 'SELECT * FROM resource_type_de')
print(table.lookall())