Generatoren und Koroutinen¶
Generator-Funktion¶
# -*- coding: utf8 -*-
def my_generator(n = 5):
"""generiert die Folge der ersten n natürlichen Zahlen"""
assert n > 0, 'n muss größer 0 sein'
i = 1
while i <= n:
# yield statt return
yield i
i += 1
for j in my_generator(8): print(j)
print('')
for j in my_generator(): print(j)
print('')
try:
for j in my_generator(0): print(j)
except AssertionError as e:
print('Ausnahme: %s/%s' % (type(e), e))
# ohne for, explizit mit next() und Abfangen der StopIteration
generator = my_generator(5)
while True:
try:
print(next(generator))
except StopIteration:
break
print('Generator erschöpft')
# man kann die StopIteration vermeiden, indem man bei next() ein
# Default-Argument angibt
generator = my_generator(3)
while True:
item = next(generator, None)
if item is None:
break
print(item)
Koroutine¶
# -*- coding: utf8 -*-
import sys
def coroutine():
"""Couroutine, die via yield Werte liefert und übernimmt"""
print('bereit')
value = 5
while True:
# yield liefert hier einen Wert und liest einen zurück, der per send()
# oder next() übergeben wird, wobei next() immer None übergibt;
# bei der Verwendung des Rückgabewertes von yield muss die
# yield-Anweisung geklammert werden, es sei denn, sie ist der einzige
# Operand auf der rechten Seite einer Zuweisung; nachfolgend könnten die
# Klammern daher entfallen
value = (yield value if value > 20 else value + 1)
# Generator/Couroutine erzeugen
c = coroutine()
# Generator/Coroutine starten, also bis zum ersten yield ausführen
if len(sys.argv) == 1:
# Variante 1 mit next():
# next() existiert bei Python 2 und 3 gleichermaßen und ruft die
# Iterator-Methode c.next() bei Python 2 und c.__next__() bei Python 3
print('next empfing %s' % next(c))
else:
# Variante 2 mit send(None) statt next()
print('send(None) empfing %s' % c.send(None))
# nun mit send() Werte senden und yield-Resultate zurücklesen
for i in 3, 17, 4, 81:
print('send(%s) empfing %s' % (i, c.send(i)))
Sub-Generatoren und yield from¶
# Beispiel für yield from
# Sub-Generator 1
def subgen1():
yield 1
yield 2
return 3
# Sub-Generator 2
def subgen2():
yield 11
yield 22
return 33
# Generator, der Werte von 2 Sub-Generatoren liefert
def gen():
# alle Werte von Sub-Generator 1 liefern; der return-Wert erscheint als
# Wert des yield-Ausdrucks
print('subgen1 returned', (yield from subgen1()))
# analog für Sub-Generator 2
print('subgen2 returned', (yield from subgen2()))
# den Generator durchlaufen
for x in gen():
print(x)
# den Sub-Generator 1 mit for durchlaufen (hier taucht der return-Wert nicht auf)
print()
for x in subgen1():
print(x)
# der return-Wert entspricht dem Value der StopIteration
print()
s = subgen1()
try:
while True: print(next(s))
except StopIteration as e:
print('return value', e.value)
Einfache Coroutinen¶
# Dekorator, der eine bereits durch next() gestartete Coroutine zurückgibt
def coroutine(f):
def start(*args, **kwargs):
gen = f(*args, **kwargs)
next(gen) # next() wird schon hier gerufen und entfällt bei der Nutzung der Coroutine
return gen
return start
# empfangene Zahlen jeweils um einen bestimmten Wert erhöhen und weitergeben
@coroutine
def add(zahl, ziel):
while True:
ziel.send(zahl + (yield))
# empfangene Zahlen jeweils quadrieren und weitergeben
@coroutine
def quadrat(ziel):
while True:
ziel.send((yield) ** 2)
# empfangene Zahlen ausgeben
@coroutine
def ausgabe():
while True:
print((yield))
f = quadrat(add(10, ausgabe()))
# Zahlen einer Folge jeweils quadrieren, um 10 erhöhen und ausgeben
for d in range(5):
f.send(d)
Asyncio¶
Echo-Klient
# Echo Server
#
# siehe auch
# 18.5.5. Streams (coroutine based API)
# https://docs.python.org/3/library/asyncio-stream.html
import asyncio
data = """A few lines of data
including non-ASCII characters: €£
to test the operation
of both server
and client."""
async def echo_client(data):
reader, writer = await asyncio.open_connection('localhost', 8881)
print('Connected to server')
for line in data:
writer.write(line.encode())
print('Sent:', line)
response = await reader.read(1024)
print('Recv:', response.decode())
writer.close()
print('Disconnected from server')
loop = asyncio.get_event_loop()
loop.run_until_complete(echo_client(data.splitlines()))
loop.close()
Echo-Server
# Echo Server
#
# siehe auch
# 18.5.5. Streams (coroutine based API)
# https://docs.python.org/3/library/asyncio-stream.html
import asyncio
async def handle(reader, writer):
address = writer.get_extra_info('peername')
print('Connected from', address)
while True:
data = await reader.read(1024)
if not data: break
s = data.decode()
print('Recv:', address, s)
writer.write(data)
await writer.drain()
print('Echo:', address, s)
await asyncio.sleep(3)
writer.close()
print('Disconnected from', address)
loop = asyncio.get_event_loop()
echo = asyncio.start_server(handle, 'localhost', 8881)
server = loop.run_until_complete(echo)
print('Serving on {}'.format(server.sockets[0].getsockname()))
try:
loop.run_forever()
except KeyboardInterrupt:
pass
server.close()
loop.run_until_complete(server.wait_closed())
loop.close()
gevent - Green Threads
# https://hackernoon.com/asynchronous-python-45df84b82434
import gevent.monkey
gevent.monkey.patch_all() # siehe http://www.gevent.org/gevent.monkey.html
from urllib.request import urlopen
urls = ['http://www.google.com', 'http://www.yandex.ru', 'http://www.python.org']
# Green Thread
def print_head(url):
print('Starting {}'.format(url))
data = urlopen(url).read()
print('{}: {} bytes: {}'.format(url, len(data), data[:100]))
jobs = [gevent.spawn(print_head, _url) for _url in urls]
gevent.wait(jobs)
Tornado
# https://hackernoon.com/asynchronous-python-45df84b82434
import tornado.ioloop
from tornado.httpclient import AsyncHTTPClient
urls = ['http://www.google.com', 'http://www.yandex.ru', 'http://www.python.org']
# Callback
def handle_response(response):
global requests
if response.error:
print("Error:", response.error)
else:
url = response.request.url
data = response.body
print('{}: {} bytes: {}'.format(url, len(data), data[:100]))
requests -= 1
if not requests:
tornado.ioloop.IOLoop.instance().stop()
http_client = AsyncHTTPClient()
for url in urls:
http_client.fetch(url, handle_response)
print(url, 'ready')
requests = len(urls)
tornado.ioloop.IOLoop.instance().start()
@asyncio.coroutine
# https://hackernoon.com/asynchronous-python-45df84b82434
import asyncio
import aiohttp # https://docs.aiohttp.org/en/stable/client.html
urls = ['http://www.google.com', 'http://www.yandex.ru', 'http://www.python.org']
@asyncio.coroutine
def call_url(session, url):
print('Starting {}'.format(url))
response = yield from session.get(url)
data = yield from response.text()
print('{}: {} bytes: {}'.format(url, len(data), data[:100]))
return data
loop = asyncio.get_event_loop()
with aiohttp.ClientSession(loop = loop) as session:
futures = [call_url(session, url) for url in urls]
loop.run_until_complete(asyncio.wait(futures))
async und await
# https://hackernoon.com/asynchronous-python-45df84b82434
import asyncio
import aiohttp # https://docs.aiohttp.org/en/stable/client.html
urls = ['http://www.google.com', 'http://www.yandex.ru', 'http://www.python.org']
async def call_url(session, url):
print('Starting {}'.format(url))
response = await session.get(url)
data = await response.text()
print('{}: {} bytes: {}'.format(url, len(data), data[:100]))
return url, data
loop = asyncio.get_event_loop()
with aiohttp.ClientSession(loop = loop) as session:
#futures = [call_url(url) for url in urls]
futures = [(loop.create_task if 1 else lambda x: x)(call_url(session, url)) for url in urls]
print('futures', futures)
done, pending = loop.run_until_complete(asyncio.wait(futures))
# Wait for the Futures and coroutine objects given by the sequence
# futures to complete. Coroutines will be wrapped in Tasks. Returns two
# sets of Future: (done, pending).
print('done', done)
print('pending', pending)
for task in done:
url, data = task.result()
print("{} ==> {} bytes".format(url, len(data)))
loop.close()
octopus¶
parallele URL-Zugriffe mit Threads
# https://www.twilio.com/blog/2016/12/http-requests-in-python-3.html
import json
from octopus import Octopus
def create_request(urls):
data = []
otto = Octopus(concurrency = 4, auto_start = True, cache = True, expiration_in_seconds = 10)
def handle_url_response(url, response):
if 'Not found' == response.text:
print ('URL Not Found: %s' % url)
else:
data.append(response.text)
for url in urls:
otto.enqueue(url, handle_url_response)
otto.wait()
for idx, d in enumerate(data, 1):
json_data = json.dumps(json.loads(d), indent = 2)
print(json_data, file = open('json_out_%d.json' % idx, 'w'))
create_request([
'https://www-apps.hrz.tu-chemnitz.de/tucrooms/api/v1/building/?public=&search=Rh',
'https://www-apps.hrz.tu-chemnitz.de/tucrooms/api/v1/building/?public=1',
])
concurrent.futures¶
Nutzung des ProcessPoolExecutors
# Python in a nutshell, 3rd edition
# https://katalog.bibliothek.tu-chemnitz.de/Record/0018703039
#
# Chapter 14. Threads and Processes
# The concurrent.futures Module
import concurrent.futures as cf
import time
def f(s):
"""run a long time and eventually return a result"""
print('f started:', s)
if s.startswith('a'):
print('sleeping on', s)
time.sleep(5)
else:
print('not sleeping on', s)
print('f finished:', s)
return s.title()
def runner(s):
return s, f(s)
def make_dict(set_of_strings):
with cf.ProcessPoolExecutor() as e:
d = dict(e.map(runner, set_of_strings))
return d
print(make_dict({'aa', 'bb', 'cc'}))
Vergleich linear, ThreadPoolExecutor, ProcessPoolExecutor
# Effective Python: 59 Specific Ways to Write Better Python
# https://katalog.bibliothek.tu-chemnitz.de/Record/0015257278
#
# Item 41: Consider concurrent.futures for True Parallelism
from time import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
def gcd(pair):
a, b = pair
low = min(a, b)
for i in range(low, 0, -1):
if a % i == 0 and b % i == 0:
return i
numbers = [(1963309, 2265973), (2030677, 3814172), (1551645, 2229620), (2039045, 2020802)]
print('linear')
start = time()
results = list(map(gcd, numbers))
end = time()
print('Took %.3f seconds' % (end - start))
print('thread pool')
start = time()
pool = ThreadPoolExecutor(max_workers=2)
results = list(pool.map(gcd, numbers))
end = time()
print('Took %.3f seconds' % (end - start))
print('process pool')
start = time()
pool = ProcessPoolExecutor(max_workers=2) # The one change
results = list(pool.map(gcd, numbers))
end = time()
print('Took %.3f seconds' % (end - start))