import asyncio
|
|
|
|
from .log import internal_logger
|
|
|
|
|
|
class BaseProtocol(asyncio.Protocol):
|
|
__slots__ = ('_loop', '_paused', '_drain_waiter',
|
|
'_connection_lost', 'transport')
|
|
|
|
def __init__(self, loop=None):
|
|
if loop is None:
|
|
self._loop = asyncio.get_event_loop()
|
|
else:
|
|
self._loop = loop
|
|
self._paused = False
|
|
self._drain_waiter = None
|
|
self._connection_lost = False
|
|
self.transport = None
|
|
|
|
def pause_writing(self):
|
|
assert not self._paused
|
|
self._paused = True
|
|
if self._loop.get_debug():
|
|
internal_logger.debug("%r pauses writing", self)
|
|
|
|
def resume_writing(self):
|
|
assert self._paused
|
|
self._paused = False
|
|
if self._loop.get_debug():
|
|
internal_logger.debug("%r resumes writing", self)
|
|
|
|
waiter = self._drain_waiter
|
|
if waiter is not None:
|
|
self._drain_waiter = None
|
|
if not waiter.done():
|
|
waiter.set_result(None)
|
|
|
|
def connection_made(self, transport):
|
|
self.transport = transport
|
|
|
|
def connection_lost(self, exc):
|
|
self._connection_lost = True
|
|
# Wake up the writer if currently paused.
|
|
self.transport = None
|
|
if not self._paused:
|
|
return
|
|
waiter = self._drain_waiter
|
|
if waiter is None:
|
|
return
|
|
self._drain_waiter = None
|
|
if waiter.done():
|
|
return
|
|
if exc is None:
|
|
waiter.set_result(None)
|
|
else:
|
|
waiter.set_exception(exc)
|
|
|
|
async def _drain_helper(self):
|
|
if self._connection_lost:
|
|
raise ConnectionResetError('Connection lost')
|
|
if not self._paused:
|
|
return
|
|
waiter = self._drain_waiter
|
|
assert waiter is None or waiter.cancelled()
|
|
waiter = self._loop.create_future()
|
|
self._drain_waiter = waiter
|
|
await waiter
|