Source
GitHub: https://github.com/jupyter/jupyter_client
Document
Jupyter Client: https://media.readthedocs.org/pdf/jupyter-client/latest/jupyter-client.pdf
Dev
aliyun
/docker_python/ipython/jupyter_client/jupyter_client/work
# ls
__init__.py myclient.py
cat myclient.py
import os
import sys
pjoin = os.path.join
#from unittest import TestCase
from jupyter_client.kernelspec import KernelSpecManager, NoSuchKernel, NATIVE_KERNEL_NAME
from jupyter_client.manager import start_new_kernel
from jupyter_client.tests.utils import test_env
import pytest
from ipython_genutils.py3compat import string_types
from IPython.utils.capture import capture_output
TIMEOUT = 30
class myclient():
def setUp(self):
self.env_patch = test_env()
self.env_patch.start()
#self.addCleanup(self.env_patch.stop)
try:
KernelSpecManager().get_kernel_spec(NATIVE_KERNEL_NAME)
except NoSuchKernel:
pytest.skip()
self.km, self.kc = start_new_kernel(kernel_name=NATIVE_KERNEL_NAME)
print("kernel:",NATIVE_KERNEL_NAME)
#self.addCleanup(self.kc.stop_channels)
#self.addCleanup(self.km.shutdown_kernel)
def _check_reply(self, reply_type, reply):
#self.assertIsInstance(reply, dict)
#self.assertEqual(reply['header']['msg_type'], reply_type + '_reply')
#self.assertEqual(reply['parent_header']['msg_type'], reply_type + '_request')
print("check_reply()")
def test_comm_info(self):
kc = self.kc
msg_id = kc.comm_info()
#self.assertIsInstance(msg_id, string_types)
reply = kc.comm_info(reply=True, timeout=TIMEOUT)
print("reply",reply)
self._check_reply('comm_info', reply)
def stop(self):
self.kc.stop_channels()
self.km.shutdown_kernel()
def test_execute_interactive(self):
print(sys._getframe().f_code.co_name)
kc = self.kc
with capture_output() as io:
reply = kc.execute_interactive("print('hello world')", timeout=TIMEOUT)
assert 'hello world' in io.stdout
print(io.stdout)
assert reply['content']['status'] == 'ok'
client = myclient()
client.setUp()
client.test_comm_info()
client.test_execute_interactive()
client.stop()
python myclient.py
网友评论