美文网首页
jupytor-client development

jupytor-client development

作者: TR老于 | 来源:发表于2018-11-21 13:30 被阅读0次

Source

GitHub: https://github.com/jupyter/jupyter_client

Document

Jupyter Client: https://media.readthedocs.org/pdf/jupyter-client/latest/jupyter-client.pdf

Dev

aliyun
/docker_python/ipython/jupyter_client/jupyter_client/work

# ls
__init__.py  myclient.py

cat myclient.py


import os
import sys
pjoin = os.path.join
#from unittest import TestCase

from jupyter_client.kernelspec import KernelSpecManager, NoSuchKernel, NATIVE_KERNEL_NAME
from jupyter_client.manager import start_new_kernel
from jupyter_client.tests.utils import test_env

import pytest

from ipython_genutils.py3compat import string_types
from IPython.utils.capture import capture_output

TIMEOUT = 30

class myclient():
    def setUp(self):
        self.env_patch = test_env()
        self.env_patch.start()
        #self.addCleanup(self.env_patch.stop)
        try:
            KernelSpecManager().get_kernel_spec(NATIVE_KERNEL_NAME)
        except NoSuchKernel:
            pytest.skip()
        self.km, self.kc = start_new_kernel(kernel_name=NATIVE_KERNEL_NAME)
        print("kernel:",NATIVE_KERNEL_NAME)
        #self.addCleanup(self.kc.stop_channels)
        #self.addCleanup(self.km.shutdown_kernel)

    
    def _check_reply(self, reply_type, reply):
        #self.assertIsInstance(reply, dict)
        #self.assertEqual(reply['header']['msg_type'], reply_type + '_reply')
        #self.assertEqual(reply['parent_header']['msg_type'], reply_type + '_request')
        print("check_reply()")

    def test_comm_info(self):
        kc = self.kc
        msg_id = kc.comm_info()
        #self.assertIsInstance(msg_id, string_types)
        reply = kc.comm_info(reply=True, timeout=TIMEOUT)
        print("reply",reply)
        self._check_reply('comm_info', reply)

    def stop(self):
        self.kc.stop_channels()
        self.km.shutdown_kernel()

    def test_execute_interactive(self):
        print(sys._getframe().f_code.co_name)
        kc = self.kc
        with capture_output() as io:
            reply = kc.execute_interactive("print('hello world')", timeout=TIMEOUT)
        assert 'hello world' in io.stdout
        print(io.stdout)
        assert reply['content']['status'] == 'ok'

client = myclient()
client.setUp()
client.test_comm_info()
client.test_execute_interactive()
client.stop()

python myclient.py

相关文章

网友评论

      本文标题:jupytor-client development

      本文链接:https://www.haomeiwen.com/subject/pbypaftx.html