|
|
- """
- Scheduler queues
- """
-
- import marshal
- from six.moves import cPickle as pickle
-
- from queuelib import queue
-
- def _serializable_queue(queue_class, serialize, deserialize):
-
- class SerializableQueue(queue_class):
-
- def push(self, obj):
- s = serialize(obj)
- super(SerializableQueue, self).push(s)
-
- def pop(self):
- s = super(SerializableQueue, self).pop()
- if s:
- return deserialize(s)
-
- return SerializableQueue
-
- def _pickle_serialize(obj):
- try:
- return pickle.dumps(obj, protocol=2)
- # Python>=3.5 raises AttributeError here while
- # Python<=3.4 raises pickle.PicklingError
- except (pickle.PicklingError, AttributeError) as e:
- raise ValueError(str(e))
-
- PickleFifoDiskQueue = _serializable_queue(queue.FifoDiskQueue, \
- _pickle_serialize, pickle.loads)
- PickleLifoDiskQueue = _serializable_queue(queue.LifoDiskQueue, \
- _pickle_serialize, pickle.loads)
- MarshalFifoDiskQueue = _serializable_queue(queue.FifoDiskQueue, \
- marshal.dumps, marshal.loads)
- MarshalLifoDiskQueue = _serializable_queue(queue.LifoDiskQueue, \
- marshal.dumps, marshal.loads)
- FifoMemoryQueue = queue.FifoMemoryQueue
- LifoMemoryQueue = queue.LifoMemoryQueue
|