Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 15 additions & 8 deletions Lib/asyncio/windows_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import msvcrt
import os
import subprocess
import tempfile
import warnings


Expand All @@ -24,17 +23,14 @@
PIPE = subprocess.PIPE
STDOUT = subprocess.STDOUT
_mmap_counter = itertools.count()
_MAX_PIPE_ATTEMPTS = 20


# Replacement for os.pipe() using handles instead of fds


def pipe(*, duplex=False, overlapped=(True, True), bufsize=BUFSIZE):
"""Like os.pipe() but with overlapped support and using handles not fds."""
address = tempfile.mktemp(
prefix=r'\\.\pipe\python-pipe-{:d}-{:d}-'.format(
os.getpid(), next(_mmap_counter)))

if duplex:
openmode = _winapi.PIPE_ACCESS_DUPLEX
access = _winapi.GENERIC_READ | _winapi.GENERIC_WRITE
Expand All @@ -56,9 +52,20 @@ def pipe(*, duplex=False, overlapped=(True, True), bufsize=BUFSIZE):

h1 = h2 = None
try:
h1 = _winapi.CreateNamedPipe(
address, openmode, _winapi.PIPE_WAIT,
1, obsize, ibsize, _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL)
for attempts in itertools.count():
address = r'\\.\pipe\python-pipe-{:d}-{:d}-{}'.format(
os.getpid(), next(_mmap_counter), os.urandom(8).hex())
try:
h1 = _winapi.CreateNamedPipe(
address, openmode, _winapi.PIPE_WAIT,
1, obsize, ibsize, _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL)
break
except OSError as e:
if attempts >= _MAX_PIPE_ATTEMPTS:
raise
if e.winerror not in (_winapi.ERROR_PIPE_BUSY,
_winapi.ERROR_ACCESS_DENIED):
raise

h2 = _winapi.CreateFile(
address, access, 0, _winapi.NULL, _winapi.OPEN_EXISTING,
Expand Down
58 changes: 40 additions & 18 deletions Lib/multiprocessing/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
CONNECTION_TIMEOUT = 20.

_mmap_counter = itertools.count()
_MAX_PIPE_ATTEMPTS = 100

default_family = 'AF_INET'
families = ['AF_INET']
Expand Down Expand Up @@ -76,8 +77,8 @@ def arbitrary_address(family):
elif family == 'AF_UNIX':
return tempfile.mktemp(prefix='sock-', dir=util.get_temp_dir())
elif family == 'AF_PIPE':
return tempfile.mktemp(prefix=r'\\.\pipe\pyc-%d-%d-' %
(os.getpid(), next(_mmap_counter)), dir="")
return (r'\\.\pipe\pyc-%d-%d-%s' %
(os.getpid(), next(_mmap_counter), os.urandom(8).hex()))
else:
raise ValueError('unrecognized family')

Expand Down Expand Up @@ -455,17 +456,29 @@ class Listener(object):
def __init__(self, address=None, family=None, backlog=1, authkey=None):
family = family or (address and address_type(address)) \
or default_family
address = address or arbitrary_address(family)

_validate_family(family)
if authkey is not None and not isinstance(authkey, bytes):
raise TypeError('authkey should be a byte string')

if family == 'AF_PIPE':
self._listener = PipeListener(address, backlog)
if address:
self._listener = PipeListener(address, backlog)
else:
for attempts in itertools.count():
address = arbitrary_address(family)
try:
self._listener = PipeListener(address, backlog)
break
except OSError as e:
if attempts >= _MAX_PIPE_ATTEMPTS:
raise
if e.winerror not in (_winapi.ERROR_PIPE_BUSY,
_winapi.ERROR_ACCESS_DENIED):
raise
else:
address = address or arbitrary_address(family)
self._listener = SocketListener(address, family, backlog)

if authkey is not None and not isinstance(authkey, bytes):
raise TypeError('authkey should be a byte string')

self._authkey = authkey

def accept(self):
Expand Down Expand Up @@ -553,7 +566,6 @@ def Pipe(duplex=True):
'''
Returns pair of connection objects at either end of a pipe
'''
address = arbitrary_address('AF_PIPE')
if duplex:
openmode = _winapi.PIPE_ACCESS_DUPLEX
access = _winapi.GENERIC_READ | _winapi.GENERIC_WRITE
Expand All @@ -563,15 +575,25 @@ def Pipe(duplex=True):
access = _winapi.GENERIC_WRITE
obsize, ibsize = 0, BUFSIZE

h1 = _winapi.CreateNamedPipe(
address, openmode | _winapi.FILE_FLAG_OVERLAPPED |
_winapi.FILE_FLAG_FIRST_PIPE_INSTANCE,
_winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE |
_winapi.PIPE_WAIT,
1, obsize, ibsize, _winapi.NMPWAIT_WAIT_FOREVER,
# default security descriptor: the handle cannot be inherited
_winapi.NULL
)
for attempts in itertools.count():
address = arbitrary_address('AF_PIPE')
try:
h1 = _winapi.CreateNamedPipe(
address, openmode | _winapi.FILE_FLAG_OVERLAPPED |
_winapi.FILE_FLAG_FIRST_PIPE_INSTANCE,
_winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE |
_winapi.PIPE_WAIT,
1, obsize, ibsize, _winapi.NMPWAIT_WAIT_FOREVER,
# default security descriptor: the handle cannot be inherited
_winapi.NULL
)
break
except OSError as e:
if attempts >= _MAX_PIPE_ATTEMPTS:
raise
if e.winerror not in (_winapi.ERROR_PIPE_BUSY,
_winapi.ERROR_ACCESS_DENIED):
raise
h2 = _winapi.CreateFile(
address, access, 0, _winapi.NULL, _winapi.OPEN_EXISTING,
_winapi.FILE_FLAG_OVERLAPPED, _winapi.NULL
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Get rid of any possibility of a name conflict for named pipes in
:mod:`multiprocessing` and :mod:`asyncio` on Windows, no matter how small.