Skip to content
23 changes: 15 additions & 8 deletions Lib/asyncio/windows_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import msvcrt
import os
import subprocess
import tempfile
import warnings


Expand All @@ -24,17 +23,14 @@
PIPE = subprocess.PIPE
STDOUT = subprocess.STDOUT
_mmap_counter = itertools.count()
_MAX_PIPE_ATTEMPTS = 100


# Replacement for os.pipe() using handles instead of fds


def pipe(*, duplex=False, overlapped=(True, True), bufsize=BUFSIZE):
"""Like os.pipe() but with overlapped support and using handles not fds."""
address = tempfile.mktemp(
prefix=r'\\.\pipe\python-pipe-{:d}-{:d}-'.format(
os.getpid(), next(_mmap_counter)))

if duplex:
openmode = _winapi.PIPE_ACCESS_DUPLEX
access = _winapi.GENERIC_READ | _winapi.GENERIC_WRITE
Expand All @@ -56,9 +52,20 @@ def pipe(*, duplex=False, overlapped=(True, True), bufsize=BUFSIZE):

h1 = h2 = None
try:
h1 = _winapi.CreateNamedPipe(
address, openmode, _winapi.PIPE_WAIT,
1, obsize, ibsize, _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL)
for attempts in itertools.count():
address = r'\\.\pipe\python-pipe-{:d}-{:d}-{}'.format(
os.getpid(), next(_mmap_counter), os.urandom(8).hex())
try:
h1 = _winapi.CreateNamedPipe(
address, openmode, _winapi.PIPE_WAIT,
1, obsize, ibsize, _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL)
break
except OSError as e:
if attempts >= _MAX_PIPE_ATTEMPTS:
raise
if e.winerror not in (_winapi.ERROR_PIPE_BUSY,
_winapi.ERROR_ACCESS_DENIED):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

an infinite loop retrying on this error could potentially hide an error if the system simply doesn't allow this process to create a pipe with the given address? (here and below)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not know if this is even possible (\\.\pipe\ is not a real directory which you can forbid access to), but it will not harm if limit the number of attempts.

raise

h2 = _winapi.CreateFile(
address, access, 0, _winapi.NULL, _winapi.OPEN_EXISTING,
Expand Down
58 changes: 40 additions & 18 deletions Lib/multiprocessing/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
CONNECTION_TIMEOUT = 20.

_mmap_counter = itertools.count()
_MAX_PIPE_ATTEMPTS = 100

default_family = 'AF_INET'
families = ['AF_INET']
Expand Down Expand Up @@ -78,8 +79,8 @@ def arbitrary_address(family):
elif family == 'AF_UNIX':
return tempfile.mktemp(prefix='sock-', dir=util.get_temp_dir())
elif family == 'AF_PIPE':
return tempfile.mktemp(prefix=r'\\.\pipe\pyc-%d-%d-' %
(os.getpid(), next(_mmap_counter)), dir="")
return (r'\\.\pipe\pyc-%d-%d-%s' %
(os.getpid(), next(_mmap_counter), os.urandom(8).hex()))
else:
raise ValueError('unrecognized family')

Expand Down Expand Up @@ -472,17 +473,29 @@ class Listener(object):
def __init__(self, address=None, family=None, backlog=1, authkey=None):
family = family or (address and address_type(address)) \
or default_family
address = address or arbitrary_address(family)

_validate_family(family)
if authkey is not None and not isinstance(authkey, bytes):
raise TypeError('authkey should be a byte string')

if family == 'AF_PIPE':
self._listener = PipeListener(address, backlog)
if address:
self._listener = PipeListener(address, backlog)
else:
for attempts in itertools.count():
address = arbitrary_address(family)
try:
self._listener = PipeListener(address, backlog)
break
except OSError as e:
if attempts >= _MAX_PIPE_ATTEMPTS:
raise
if e.winerror not in (_winapi.ERROR_PIPE_BUSY,
_winapi.ERROR_ACCESS_DENIED):
raise
else:
address = address or arbitrary_address(family)
self._listener = SocketListener(address, family, backlog)

if authkey is not None and not isinstance(authkey, bytes):
raise TypeError('authkey should be a byte string')

self._authkey = authkey

def accept(self):
Expand Down Expand Up @@ -570,7 +583,6 @@ def Pipe(duplex=True):
'''
Returns pair of connection objects at either end of a pipe
'''
address = arbitrary_address('AF_PIPE')
if duplex:
openmode = _winapi.PIPE_ACCESS_DUPLEX
access = _winapi.GENERIC_READ | _winapi.GENERIC_WRITE
Expand All @@ -580,15 +592,25 @@ def Pipe(duplex=True):
access = _winapi.GENERIC_WRITE
obsize, ibsize = 0, BUFSIZE

h1 = _winapi.CreateNamedPipe(
address, openmode | _winapi.FILE_FLAG_OVERLAPPED |
_winapi.FILE_FLAG_FIRST_PIPE_INSTANCE,
_winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE |
_winapi.PIPE_WAIT,
1, obsize, ibsize, _winapi.NMPWAIT_WAIT_FOREVER,
# default security descriptor: the handle cannot be inherited
_winapi.NULL
)
for attempts in itertools.count():
address = arbitrary_address('AF_PIPE')
try:
h1 = _winapi.CreateNamedPipe(
address, openmode | _winapi.FILE_FLAG_OVERLAPPED |
_winapi.FILE_FLAG_FIRST_PIPE_INSTANCE,
_winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE |
_winapi.PIPE_WAIT,
1, obsize, ibsize, _winapi.NMPWAIT_WAIT_FOREVER,
# default security descriptor: the handle cannot be inherited
_winapi.NULL
)
break
except OSError as e:
if attempts >= _MAX_PIPE_ATTEMPTS:
raise
if e.winerror not in (_winapi.ERROR_PIPE_BUSY,
_winapi.ERROR_ACCESS_DENIED):
raise
h2 = _winapi.CreateFile(
address, access, 0, _winapi.NULL, _winapi.OPEN_EXISTING,
_winapi.FILE_FLAG_OVERLAPPED, _winapi.NULL
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Get rid of any possibility of a name conflict for named pipes in
:mod:`multiprocessing` and :mod:`asyncio` on Windows, no matter how small.
Loading