Generatoren und Koroutinen

Generator-Funktion

# -*- coding: utf8 -*-

def my_generator(n = 5):
  """generiert die Folge der ersten n natürlichen Zahlen"""
  assert n > 0, 'n muss größer 0 sein'
  i = 1
  while i <= n:
    # yield statt return
    yield i
    i += 1

for j in my_generator(8): print(j)
print('')
for j in my_generator(): print(j)
print('')

try:
  for j in my_generator(0): print(j)
except AssertionError as e:
  print('Ausnahme: %s/%s' % (type(e), e))
  
# ohne for, explizit mit next() und Abfangen der StopIteration
generator = my_generator(5)
while True:
  try:
    print(next(generator))
  except StopIteration:
    break

print('Generator erschöpft')

# man kann die StopIteration vermeiden, indem man bei next() ein
# Default-Argument angibt
generator = my_generator(3)
while True:
    item = next(generator, None)
    if item is None:
      break
    print(item)

Koroutine

# -*- coding: utf8 -*-

import sys

def coroutine():
    """Couroutine, die via yield Werte liefert und übernimmt"""
    print('bereit')
    value = 5
    while True:
      # yield liefert hier einen Wert und liest einen zurück, der per send()
      # oder next() übergeben wird, wobei next() immer None übergibt;
      # bei der Verwendung des Rückgabewertes von yield muss die
      # yield-Anweisung geklammert werden, es sei denn, sie ist der einzige
      # Operand auf der rechten Seite einer Zuweisung; nachfolgend könnten die
      # Klammern daher entfallen
      value = (yield value if value > 20 else value + 1)

# Generator/Couroutine erzeugen
c = coroutine()

# Generator/Coroutine starten, also bis zum ersten yield ausführen
if len(sys.argv) == 1:
    # Variante 1 mit next():
    # next() existiert bei Python 2 und 3 gleichermaßen und ruft die
    # Iterator-Methode c.next() bei Python 2 und c.__next__() bei Python 3
    print('next empfing %s' % next(c))
else:
    # Variante 2 mit send(None) statt next()
    print('send(None) empfing %s' % c.send(None))

# nun mit send() Werte senden und yield-Resultate zurücklesen
for i in 3, 17, 4, 81:
    print('send(%s) empfing %s' % (i, c.send(i)))

Sub-Generatoren und yield from

# Beispiel für yield from

# Sub-Generator 1
def subgen1():
    yield 1
    yield 2
    return 3

# Sub-Generator 2
def subgen2():
    yield 11
    yield 22
    return 33

# Generator, der Werte von 2 Sub-Generatoren liefert
def gen():
    # alle Werte von Sub-Generator 1 liefern; der return-Wert erscheint als
    # Wert des yield-Ausdrucks
    print('subgen1 returned', (yield from subgen1()))
    # analog für Sub-Generator 2
    print('subgen2 returned', (yield from subgen2()))

# den Generator durchlaufen
for x in gen():
    print(x)

# den Sub-Generator 1 mit for durchlaufen (hier taucht der return-Wert nicht auf)
print()
for x in subgen1():
    print(x)

# der return-Wert entspricht dem Value der StopIteration
print()
s = subgen1()
try:
    while True: print(next(s))
except StopIteration as e:
    print('return value', e.value)

Einfache Coroutinen

# Dekorator, der eine bereits durch next() gestartete Coroutine zurückgibt
def coroutine(f):
    def start(*args, **kwargs):
        gen = f(*args, **kwargs)
        next(gen) # next() wird schon hier gerufen und entfällt bei der Nutzung der Coroutine
        return gen
    return start

# empfangene Zahlen jeweils um einen bestimmten Wert erhöhen und weitergeben
@coroutine
def add(zahl, ziel):
    while True:
        ziel.send(zahl + (yield))

# empfangene Zahlen jeweils quadrieren und weitergeben
@coroutine
def quadrat(ziel):
    while True:
        ziel.send((yield) ** 2)

# empfangene Zahlen ausgeben
@coroutine
def ausgabe():
    while True:
        print((yield))

f = quadrat(add(10, ausgabe()))

# Zahlen einer Folge jeweils quadrieren, um 10 erhöhen und ausgeben
for d in range(5):
    f.send(d)

Asyncio

Echo-Klient

# Echo Server
#
# siehe auch
#   18.5.5. Streams (coroutine based API)
#   https://docs.python.org/3/library/asyncio-stream.html

import asyncio

data = """A few lines of data
including non-ASCII characters: €£
to test the operation
of both server
and client."""

async def echo_client(data):
    reader, writer = await asyncio.open_connection('localhost', 8881)
    print('Connected to server')
    for line in data:
        writer.write(line.encode())
        print('Sent:', line)
        response = await reader.read(1024)
        print('Recv:', response.decode())
    writer.close()
    print('Disconnected from server')

loop = asyncio.get_event_loop()
loop.run_until_complete(echo_client(data.splitlines()))
loop.close()

Echo-Server

# Echo Server
#
# siehe auch
#   18.5.5. Streams (coroutine based API)
#   https://docs.python.org/3/library/asyncio-stream.html

import asyncio

async def handle(reader, writer):
    address = writer.get_extra_info('peername')
    print('Connected from', address)

    while True:
        data = await reader.read(1024)
        if not data: break
        s = data.decode()
        print('Recv:', address, s)
        writer.write(data)
        await writer.drain()
        print('Echo:', address, s)
        await asyncio.sleep(3)

    writer.close()
    print('Disconnected from', address)

loop = asyncio.get_event_loop()
echo = asyncio.start_server(handle, 'localhost', 8881)
server = loop.run_until_complete(echo)

print('Serving on {}'.format(server.sockets[0].getsockname()))
try:
    loop.run_forever()
except KeyboardInterrupt:
    pass
server.close()
loop.run_until_complete(server.wait_closed())
loop.close()

gevent - Green Threads

# https://hackernoon.com/asynchronous-python-45df84b82434

import gevent.monkey
gevent.monkey.patch_all() # siehe http://www.gevent.org/gevent.monkey.html
from urllib.request import urlopen

urls = ['http://www.google.com', 'http://www.yandex.ru', 'http://www.python.org']

# Green Thread
def print_head(url):
    print('Starting {}'.format(url))
    data = urlopen(url).read()
    print('{}: {} bytes: {}'.format(url, len(data), data[:100]))

jobs = [gevent.spawn(print_head, _url) for _url in urls]

gevent.wait(jobs)

Tornado

# https://hackernoon.com/asynchronous-python-45df84b82434

import tornado.ioloop
from tornado.httpclient import AsyncHTTPClient
urls = ['http://www.google.com', 'http://www.yandex.ru', 'http://www.python.org']

# Callback
def handle_response(response):
    global requests
    if response.error:
        print("Error:", response.error)
    else:
        url = response.request.url
        data = response.body
        print('{}: {} bytes: {}'.format(url, len(data), data[:100]))

    requests -= 1
    if not requests:
        tornado.ioloop.IOLoop.instance().stop()

http_client = AsyncHTTPClient()
for url in urls:
    http_client.fetch(url, handle_response)
    print(url, 'ready')
    
requests = len(urls)

tornado.ioloop.IOLoop.instance().start()

@asyncio.coroutine

# https://hackernoon.com/asynchronous-python-45df84b82434

import asyncio
import aiohttp # https://docs.aiohttp.org/en/stable/client.html

urls = ['http://www.google.com', 'http://www.yandex.ru', 'http://www.python.org']

@asyncio.coroutine
def call_url(session, url):
    print('Starting {}'.format(url))
    response = yield from session.get(url)
    data = yield from response.text()
    print('{}: {} bytes: {}'.format(url, len(data), data[:100]))
    return data

loop = asyncio.get_event_loop()
with aiohttp.ClientSession(loop = loop) as session:
    futures = [call_url(session, url) for url in urls]
    loop.run_until_complete(asyncio.wait(futures))

async und await

# https://hackernoon.com/asynchronous-python-45df84b82434

import asyncio
import aiohttp # https://docs.aiohttp.org/en/stable/client.html

urls = ['http://www.google.com', 'http://www.yandex.ru', 'http://www.python.org']

async def call_url(session, url):
    print('Starting {}'.format(url))
    response = await session.get(url)
    data = await response.text()
    print('{}: {} bytes: {}'.format(url, len(data), data[:100]))
    return url, data

loop = asyncio.get_event_loop()
with aiohttp.ClientSession(loop = loop) as session:
    #futures = [call_url(url) for url in urls]
    futures = [(loop.create_task if 1 else lambda x: x)(call_url(session, url)) for url in urls]
    print('futures', futures)
    done, pending = loop.run_until_complete(asyncio.wait(futures))
      # Wait for the Futures and coroutine objects given by the sequence
      # futures to complete. Coroutines will be wrapped in Tasks. Returns two
      # sets of Future: (done, pending).
    print('done', done)
    print('pending', pending)

    for task in done:
        url, data = task.result()
        print("{} ==> {} bytes".format(url, len(data)))

loop.close()

octopus

parallele URL-Zugriffe mit Threads

# https://www.twilio.com/blog/2016/12/http-requests-in-python-3.html

import json
from octopus import Octopus

def create_request(urls):
    data = []
 
    otto = Octopus(concurrency = 4, auto_start = True, cache = True, expiration_in_seconds = 10)
 
    def handle_url_response(url, response):
        if 'Not found' == response.text:
            print ('URL Not Found: %s' % url)
        else:
            data.append(response.text)
 
    for url in urls:
        otto.enqueue(url, handle_url_response)
 
    otto.wait()
 
    for idx, d in enumerate(data, 1):
        json_data = json.dumps(json.loads(d), indent = 2)
        print(json_data, file = open('json_out_%d.json' % idx, 'w'))
 
create_request([
     'https://www-apps.hrz.tu-chemnitz.de/tucrooms/api/v1/building/?public=&search=Rh',
     'https://www-apps.hrz.tu-chemnitz.de/tucrooms/api/v1/building/?public=1',
])

concurrent.futures

Nutzung des ProcessPoolExecutors

# Python in a nutshell, 3rd edition
# https://katalog.bibliothek.tu-chemnitz.de/Record/0018703039
#
# Chapter 14. Threads and Processes
#   The concurrent.futures Module

import concurrent.futures as cf
import time

def f(s):
    """run a long time and eventually return a result"""
    print('f started:', s)
    if s.startswith('a'):
        print('sleeping on', s)
        time.sleep(5)
    else:
        print('not sleeping on', s)
    print('f finished:', s)
    return s.title()

def runner(s):
    return s, f(s)

def make_dict(set_of_strings):
    with cf.ProcessPoolExecutor() as e:
        d = dict(e.map(runner, set_of_strings))
    return d

print(make_dict({'aa', 'bb', 'cc'}))

Vergleich linear, ThreadPoolExecutor, ProcessPoolExecutor

# Effective Python: 59 Specific Ways to Write Better Python
# https://katalog.bibliothek.tu-chemnitz.de/Record/0015257278
#
# Item 41: Consider concurrent.futures for True Parallelism

from time import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

def gcd(pair):
    a, b = pair
    low = min(a, b)
    for i in range(low, 0, -1):
        if a % i == 0 and b % i == 0:
            return i

numbers = [(1963309, 2265973), (2030677, 3814172), (1551645, 2229620), (2039045, 2020802)]

print('linear')
start = time()
results = list(map(gcd, numbers))
end = time()
print('Took %.3f seconds' % (end - start))

print('thread pool')
start = time()
pool = ThreadPoolExecutor(max_workers=2)
results = list(pool.map(gcd, numbers))
end = time()
print('Took %.3f seconds' % (end - start))

print('process pool')
start = time()
pool = ProcessPoolExecutor(max_workers=2)  # The one change
results = list(pool.map(gcd, numbers))
end = time()
print('Took %.3f seconds' % (end - start))