Commit 75d40910 authored by Kirill Smelkov's avatar Kirill Smelkov

os/signal: tests: Use SIGTERM/SIGINT on Windows

We use SIGUSR1/SIGUSR2 mainly in this test, but those signals are not
available on Windows:

        @func
        def test_signal():
            # Notify/Stop with wrong chan dtype -> panic
            _ = panics("pychan: channel type mismatch")
    >       with _:  signal.Notify(chan(2), syscall.SIGUSR1)
    E       AttributeError: module 'golang.syscall' has no attribute 'SIGUSR1'

-> Use SIGTERM/SIGINT if SIGUSR1/SIGUSR2 are not available.

Don't want to use SIGTERM/SIGINT unconditionally because those signals are
better to leave for the job control so that e.g. nxdtest can properly kill
spawned pygolang tests.
parent a5349f5d
# -*- coding: utf-8 -*-
# Copyright (C) 2021-2022 Nexedi SA and Contributors.
# Copyright (C) 2021-2023 Nexedi SA and Contributors.
# Kirill Smelkov <kirr@nexedi.com>
#
# This program is free software: you can Use, Study, Modify and Redistribute
......@@ -36,6 +36,16 @@ dir_os = dirname(__file__) # .../pygolang/os
dir_testprog = dir_os + "/testprog" # .../pygolang/os/testprog
# default to use SIGUSR1/SIGUSR2 in tests.
# but use SIGTERM/SIGINT if those are not available (windows).
try:
SIG1 = getattr(syscall, 'SIGUSR1')
SIG2 = getattr(syscall, 'SIGUSR2')
except AttributeError:
SIG1 = syscall.SIGTERM
SIG2 = syscall.SIGINT
N = 1000
# test_signal verifies signal delivery to channels controlled by Notify/Stop/Ignore/Reset.
......@@ -43,9 +53,9 @@ N = 1000
def test_signal():
# Notify/Stop with wrong chan dtype -> panic
_ = panics("pychan: channel type mismatch")
with _: signal.Notify(chan(2), syscall.SIGUSR1)
with _: signal.Notify(chan(2), SIG1)
with _: signal.Stop (chan(2))
with _: signal.Notify(chan(2, dtype='C.int'), syscall.SIGUSR1)
with _: signal.Notify(chan(2, dtype='C.int'), SIG1)
with _: signal.Stop (chan(2, dtype='C.int'))
# Notify/Ignore/Reset with wrong signal type
......@@ -54,109 +64,109 @@ def test_signal():
with _: signal.Ignore(None)
with _: signal.Reset(None)
# subscribe ch1(USR1), ch12(USR1,USR2) and ch2(USR2)
# subscribe ch1(SIG1), ch12(SIG1,SIG2) and ch2(SIG2)
ch1 = chan(2, dtype=gos.Signal)
ch12 = chan(2, dtype=gos.Signal)
ch2 = chan(2, dtype=gos.Signal)
signal.Notify(ch1, syscall.SIGUSR1)
signal.Notify(ch12, syscall.SIGUSR1, syscall.SIGUSR2)
signal.Notify(ch2, syscall.SIGUSR2)
signal.Notify(ch1, SIG1)
signal.Notify(ch12, SIG1, SIG2)
signal.Notify(ch2, SIG2)
def _():
signal.Reset()
defer(_)
for i in range(N):
# raise SIGUSR1 -> should be delivered to ch1 and ch12
# raise SIG1 -> should be delivered to ch1 and ch12
assert len(ch1) == 0
assert len(ch12) == 0
assert len(ch2) == 0
killme(syscall.SIGUSR1)
killme(SIG1)
waitfor(lambda: len(ch1) == 1 and len(ch12) == 1)
sig1 = ch1.recv()
sig12 = ch12.recv()
assert sig1 == syscall.SIGUSR1
assert sig12 == syscall.SIGUSR1
assert sig1 == SIG1
assert sig12 == SIG1
# raise SIGUSR2 -> should be delivered to ch12 and ch2
# raise SIG2 -> should be delivered to ch12 and ch2
assert len(ch1) == 0
assert len(ch12) == 0
assert len(ch2) == 0
killme(syscall.SIGUSR2)
killme(SIG2)
waitfor(lambda: len(ch12) == 1 and len(ch2) == 1)
sig12 = ch12.recv()
sig2 = ch2.recv()
assert sig12 == syscall.SIGUSR2
assert sig2 == syscall.SIGUSR2
# if SIGUSR2 will be eventually delivered to ch1 - it will break
# in SIGUSR1 check on next iteration.
assert sig12 == SIG2
assert sig2 == SIG2
# if SIG2 will be eventually delivered to ch1 - it will break
# in SIG1 check on next iteration.
# Stop(ch2) -> signals should not be delivered to ch2 anymore
signal.Stop(ch2)
for i in range(N):
# USR1 -> ch1, ch12
# SIG1 -> ch1, ch12
assert len(ch1) == 0
assert len(ch12) == 0
assert len(ch2) == 0
killme(syscall.SIGUSR1)
killme(SIG1)
waitfor(lambda: len(ch1) == 1 and len(ch12) == 1)
sig1 = ch1.recv()
sig12 = ch12.recv()
assert sig1 == syscall.SIGUSR1
assert sig12 == syscall.SIGUSR1
assert sig1 == SIG1
assert sig12 == SIG1
# USR2 -> ch12, !ch2
# SIG2 -> ch12, !ch2
assert len(ch1) == 0
assert len(ch12) == 0
assert len(ch2) == 0
killme(syscall.SIGUSR2)
killme(SIG2)
waitfor(lambda: len(ch12) == 1)
sig12 = ch12.recv()
assert sig12 == syscall.SIGUSR2
# if SIGUSR2 will be eventually delivered to ch2 - it will break on
assert sig12 == SIG2
# if SIG2 will be eventually delivered to ch2 - it will break on
# next iteration.
# Ignore(USR1) -> ch1 should not be delivered to anymore, ch12 should be delivered only USR2
signal.Ignore(syscall.SIGUSR1)
# Ignore(SIG1) -> ch1 should not be delivered to anymore, ch12 should be delivered only SIG2
signal.Ignore(SIG1)
for i in range(N):
# USR1 -> ø
# SIG1 -> ø
assert len(ch1) == 0
assert len(ch12) == 0
assert len(ch2) == 0
killme(syscall.SIGUSR1)
killme(SIG1)
time.sleep(1E-6)
# USR2 -> ch12
# SIG2 -> ch12
assert len(ch1) == 0
assert len(ch12) == 0
assert len(ch2) == 0
killme(syscall.SIGUSR2)
killme(SIG2)
waitfor(lambda: len(ch12) == 1)
sig12 = ch12.recv()
assert sig12 == syscall.SIGUSR2
# if SIGUSR1 or SIGUSR2 will be eventually delivered to ch1 or ch2 - it
assert sig12 == SIG2
# if SIG1 or SIG2 will be eventually delivered to ch1 or ch2 - it
# will break on next iteration.
# Notify after Ignore
signal.Notify(ch1, syscall.SIGUSR1)
signal.Notify(ch1, SIG1)
for i in range(N):
# USR1 -> ch1
# SIG1 -> ch1
assert len(ch1) == 0
assert len(ch12) == 0
assert len(ch2) == 0
killme(syscall.SIGUSR1)
killme(SIG1)
waitfor(lambda: len(ch1) == 1)
sig1 = ch1.recv()
assert sig1 == syscall.SIGUSR1
assert sig1 == SIG1
# USR2 -> ch12
# SIG2 -> ch12
assert len(ch1) == 0
assert len(ch12) == 0
assert len(ch2) == 0
killme(syscall.SIGUSR2)
killme(SIG2)
waitfor(lambda: len(ch12) == 1)
sig12 = ch12.recv()
assert sig12 == syscall.SIGUSR2
# if SIGUSR1 or SIGUSR2 will be eventually delivered to wrong place -
assert sig12 == SIG2
# if SIG1 or SIG2 will be eventually delivered to wrong place -
# it will break on next iteration.
# Reset is tested in test_stdlib_interop (it needs non-terminating default
......@@ -173,11 +183,11 @@ def test_notify_reinstall():
for i in range(N):
signal.Stop(ch)
signal.Notify(ch, syscall.SIGUSR1)
signal.Notify(ch, SIG1)
time.sleep(0.1*time.second)
assert len(ch) == 0
killme(syscall.SIGUSR1)
killme(SIG1)
time.sleep(0.1*time.second)
assert len(ch) == 1
......@@ -206,23 +216,23 @@ def test_stdlib_interop():
ch1 = chan(2, dtype=object) # NOTE not gos.Signal nor 'C.os::Signal'
def _(signo, frame):
ch1.send("USR1")
pysig.signal(pysig.SIGUSR1, _)
ch1.send("SIG1")
pysig.signal(SIG1.signo, _)
def _():
pysig.signal(pysig.SIGUSR1, pysig.SIG_IGN)
pysig.signal(SIG1.signo, pysig.SIG_IGN)
defer(_)
# verify that plain pysig delivery works
for i in range(N):
assert len(ch1) == 0
killme(syscall.SIGUSR1)
killme(SIG1)
waitfor(lambda: len(ch1) == 1)
obj1 = ch1.recv()
assert obj1 == "USR1"
assert obj1 == "SIG1"
# verify that combined pysig + golang.os.signal delivery works
ch2 = chan(2, dtype=gos.Signal)
signal.Notify(ch2, syscall.SIGUSR1)
signal.Notify(ch2, SIG1)
def _():
signal.Stop(ch2)
defer(_)
......@@ -230,46 +240,46 @@ def test_stdlib_interop():
for i in range(N):
assert len(ch1) == 0
assert len(ch2) == 0
killme(syscall.SIGUSR1)
killme(SIG1)
waitfor(lambda: len(ch1) == 1 and len(ch2) == 1)
obj1 = ch1.recv()
sig2 = ch2.recv()
assert obj1 == "USR1"
assert sig2 == syscall.SIGUSR1
assert obj1 == "SIG1"
assert sig2 == SIG1
# Ignore stops delivery to both pysig and golang.os.signal
signal.Ignore(syscall.SIGUSR1)
signal.Ignore(SIG1)
for i in range(N):
assert len(ch1) == 0
assert len(ch2) == 0
killme(syscall.SIGUSR1)
killme(SIG1)
time.sleep(1E-6)
time.sleep(0.1) # just in case
assert len(ch1) == 0
assert len(ch2) == 0
# after Reset pysig delivery is restored even after Ignore
signal.Reset(syscall.SIGUSR1)
signal.Reset(SIG1)
for i in range(N):
assert len(ch1) == 0
assert len(ch2) == 0
killme(syscall.SIGUSR1)
killme(SIG1)
waitfor(lambda: len(ch1) == 1)
assert len(ch2) == 0
obj1 = ch1.recv()
assert obj1 == "USR1"
assert obj1 == "SIG1"
# Reset stops delivery to golang.os.signal and restores pysig delivery
signal.Notify(ch2, syscall.SIGUSR1)
signal.Reset(syscall.SIGUSR1)
signal.Notify(ch2, SIG1)
signal.Reset(SIG1)
for i in range(N):
assert len(ch1) == 0
assert len(ch2) == 0
killme(syscall.SIGUSR1)
killme(SIG1)
waitfor(lambda: len(ch1) == 1)
assert len(ch2) == 0
obj1 = ch1.recv()
assert obj1 == "USR1"
assert obj1 == "SIG1"
# test_stdlib_interop_KeyboardInterrupt verifies that signal.{Notify,Ignore} disable
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment