Test Core Eventing (#205)

* Add core event tests

* Update .gitignore with coverage

* Add shortlink gif

* Add event dispatcher test

* Test event subscriber
This commit is contained in:
Abhinav Singh 2019-11-30 22:04:43 -08:00 committed by GitHub
parent dc1d5b68e7
commit 654696d2f9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 214 additions and 2 deletions

1
.gitignore vendored
View File

@ -1,4 +1,5 @@
.coverage
.coverage.*
.idea
.vscode
.project

View File

@ -314,6 +314,8 @@ Plugin Examples
Add support for short links in your favorite browsers / applications.
[![Shortlink Plugin](https://raw.githubusercontent.com/abhinavsingh/proxy.py/testit/shortlink.gif)](https://github.com/abhinavsingh/proxy.py#shortlinkplugin)
Start `proxy.py` as:
```

View File

@ -147,12 +147,15 @@ class EventDispatcher:
for sub_id in unsub_ids:
del self.subscribers[sub_id]
def run_once(self) -> None:
ev: Dict[str, Any] = self.event_queue.queue.get(timeout=1)
self.handle_event(ev)
def run(self) -> None:
try:
while not self.shutdown.is_set():
try:
ev: Dict[str, Any] = self.event_queue.queue.get(timeout=1)
self.handle_event(ev)
self.run_once()
except queue.Empty:
pass
except EOFError:

BIN
shortlink.gif Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 124 KiB

View File

@ -0,0 +1,91 @@
# -*- coding: utf-8 -*-
"""
proxy.py
~~~~~~~~
Fast, Lightweight, Pluggable, TLS interception capable proxy server focused on
Network monitoring, controls & Application development, testing, debugging.
:copyright: (c) 2013-present by Abhinav Singh and contributors.
:license: BSD, see LICENSE for more details.
"""
import multiprocessing
import os
import threading
import unittest
import queue
from unittest import mock
from proxy.common.types import DictQueueType
from proxy.core.event import EventDispatcher, EventQueue, eventNames
class TestEventDispatcher(unittest.TestCase):
def setUp(self) -> None:
self.dispatcher_shutdown = threading.Event()
self.event_queue = EventQueue()
self.dispatcher = EventDispatcher(
shutdown=self.dispatcher_shutdown,
event_queue=self.event_queue)
def tearDown(self) -> None:
self.dispatcher_shutdown.set()
def test_empties_queue(self) -> None:
self.event_queue.publish(
request_id='1234',
event_name=eventNames.WORK_STARTED,
event_payload={'hello': 'events'},
publisher_id=self.__class__.__name__
)
self.dispatcher.run_once()
with self.assertRaises(queue.Empty):
self.dispatcher.run_once()
@mock.patch('time.time')
def subscribe(self, mock_time: mock.Mock) -> DictQueueType:
mock_time.return_value = 1234567
q = multiprocessing.Manager().Queue()
self.event_queue.subscribe(sub_id='1234', channel=q)
self.dispatcher.run_once()
self.event_queue.publish(
request_id='1234',
event_name=eventNames.WORK_STARTED,
event_payload={'hello': 'events'},
publisher_id=self.__class__.__name__
)
self.dispatcher.run_once()
self.assertEqual(q.get(), {
'request_id': '1234',
'process_id': os.getpid(),
'thread_id': threading.get_ident(),
'event_timestamp': 1234567,
'event_name': eventNames.WORK_STARTED,
'event_payload': {'hello': 'events'},
'publisher_id': self.__class__.__name__,
})
return q
def test_subscribe(self) -> None:
self.subscribe()
def test_unsubscribe(self) -> None:
q = self.subscribe()
self.event_queue.unsubscribe('1234')
self.dispatcher.run_once()
self.event_queue.publish(
request_id='1234',
event_name=eventNames.WORK_STARTED,
event_payload={'hello': 'events'},
publisher_id=self.__class__.__name__
)
self.dispatcher.run_once()
with self.assertRaises(queue.Empty):
q.get(timeout=0.1)
def test_unsubscribe_on_broken_pipe_error(self) -> None:
pass
def test_run(self) -> None:
pass

View File

@ -0,0 +1,56 @@
# -*- coding: utf-8 -*-
"""
proxy.py
~~~~~~~~
Fast, Lightweight, Pluggable, TLS interception capable proxy server focused on
Network monitoring, controls & Application development, testing, debugging.
:copyright: (c) 2013-present by Abhinav Singh and contributors.
:license: BSD, see LICENSE for more details.
"""
import multiprocessing
import os
import threading
import unittest
from unittest import mock
from proxy.core.event import EventQueue, eventNames
class TestCoreEvent(unittest.TestCase):
@mock.patch('time.time')
def test_publish(self, mock_time: mock.Mock) -> None:
mock_time.return_value = 1234567
evq = EventQueue()
evq.publish(
request_id='1234',
event_name=eventNames.WORK_STARTED,
event_payload={'hello': 'events'},
publisher_id=self.__class__.__name__
)
self.assertEqual(evq.queue.get(), {
'request_id': '1234',
'process_id': os.getpid(),
'thread_id': threading.get_ident(),
'event_timestamp': 1234567,
'event_name': eventNames.WORK_STARTED,
'event_payload': {'hello': 'events'},
'publisher_id': self.__class__.__name__,
})
def test_subscribe(self) -> None:
evq = EventQueue()
q = multiprocessing.Manager().Queue()
evq.subscribe('1234', q)
ev = evq.queue.get()
self.assertEqual(ev['event_name'], eventNames.SUBSCRIBE)
self.assertEqual(ev['event_payload']['sub_id'], '1234')
def test_unsubscribe(self) -> None:
evq = EventQueue()
evq.unsubscribe('1234')
ev = evq.queue.get()
self.assertEqual(ev['event_name'], eventNames.UNSUBSCRIBE)
self.assertEqual(ev['event_payload']['sub_id'], '1234')

View File

@ -0,0 +1,59 @@
# -*- coding: utf-8 -*-
"""
proxy.py
~~~~~~~~
Fast, Lightweight, Pluggable, TLS interception capable proxy server focused on
Network monitoring, controls & Application development, testing, debugging.
:copyright: (c) 2013-present by Abhinav Singh and contributors.
:license: BSD, see LICENSE for more details.
"""
import os
import threading
import unittest
from typing import Dict, Any
from unittest import mock
from proxy.core.event import EventQueue, EventDispatcher, EventSubscriber, eventNames
PUBLISHER_ID = threading.get_ident()
class TestEventSubscriber(unittest.TestCase):
@mock.patch('time.time')
def test_event_subscriber(self, mock_time: mock.Mock) -> None:
mock_time.return_value = 1234567
self.dispatcher_shutdown = threading.Event()
self.event_queue = EventQueue()
self.dispatcher = EventDispatcher(
shutdown=self.dispatcher_shutdown,
event_queue=self.event_queue)
self.subscriber = EventSubscriber(self.event_queue)
self.subscriber.subscribe(self.callback)
self.dispatcher.run_once()
self.event_queue.publish(
request_id='1234',
event_name=eventNames.WORK_STARTED,
event_payload={'hello': 'events'},
publisher_id=self.__class__.__name__
)
self.dispatcher.run_once()
self.subscriber.unsubscribe()
self.dispatcher.run_once()
self.dispatcher_shutdown.set()
def callback(self, ev: Dict[str, Any]) -> None:
self.assertEqual(ev, {
'request_id': '1234',
'process_id': os.getpid(),
'thread_id': PUBLISHER_ID,
'event_timestamp': 1234567,
'event_name': eventNames.WORK_STARTED,
'event_payload': {'hello': 'events'},
'publisher_id': self.__class__.__name__,
})