This document is for Kombu's development version, which can be significantly different from previous releases. Get the stable docs here: 5.3.
File-system Transport - kombu.transport.filesystem
¶
File-system Transport module for kombu.
Transport using the file-system as the message store. Messages written to the queue are stored in data_folder_in directory and messages read from the queue are read from data_folder_out directory. Both directories must be created manually. Simple example:
Producer:
import kombu
conn = kombu.Connection(
'filesystem://', transport_options={
'data_folder_in': 'data_in', 'data_folder_out': 'data_out'
}
)
conn.connect()
test_queue = kombu.Queue('test', routing_key='test')
with conn as conn:
with conn.default_channel as channel:
producer = kombu.Producer(channel)
producer.publish(
{'hello': 'world'},
retry=True,
exchange=test_queue.exchange,
routing_key=test_queue.routing_key,
declare=[test_queue],
serializer='pickle'
)
Consumer:
import kombu
conn = kombu.Connection(
'filesystem://', transport_options={
'data_folder_in': 'data_out', 'data_folder_out': 'data_in'
}
)
conn.connect()
def callback(body, message):
print(body, message)
message.ack()
test_queue = kombu.Queue('test', routing_key='test')
with conn as conn:
with conn.default_channel as channel:
consumer = kombu.Consumer(
conn, [test_queue], accept=['pickle']
)
consumer.register_callback(callback)
with consumer:
conn.drain_events(timeout=1)
Features¶
Type: Virtual
Supports Direct: Yes
Supports Topic: Yes
Supports Fanout: Yes
Supports Priority: No
Supports TTL: No
Connection String¶
Connection string is in the following format:
filesystem://
Transport Options¶
data_folder_in
- directory where are messages stored when written to queue.data_folder_out
- directory from which are messages read when read from queue.store_processed
- if set to True, all processed messages are backed up toprocessed_folder
.processed_folder
- directory where are backed up processed files.control_folder
- directory where are exchange-queue table stored.
Transport¶
- class kombu.transport.filesystem.Transport(client, **kwargs)[source]¶
Filesystem Transport.
- class Channel(connection, **kwargs)¶
Filesystem Channel.
- property control_folder¶
- property data_folder_in¶
- property data_folder_out¶
- get_table(exchange)¶
Get table of bindings for exchange.
- property processed_folder¶
- property store_processed¶
- supports_fanout = True¶
flag set if the channel supports fanout exchanges.
- property transport_options¶
- default_port = 0¶
port number used when no port is specified.
- driver_name = 'filesystem'¶
Name of driver library (e.g. ‘py-amqp’, ‘redis’).
- driver_type = 'filesystem'¶
Type of driver, can be used to separate transports using the AMQP protocol (driver_type: ‘amqp’), Redis (driver_type: ‘redis’), etc…
- global_state = <kombu.transport.virtual.base.BrokerState object>¶
- implements = {'asynchronous': False, 'exchange_type': frozenset({'direct', 'fanout', 'topic'}), 'heartbeats': False}¶
Channel¶
- class kombu.transport.filesystem.Channel(connection, **kwargs)[source]¶
Filesystem Channel.
- property control_folder¶
- property data_folder_in¶
- property data_folder_out¶
- property processed_folder¶
- property store_processed¶
- supports_fanout = True¶
flag set if the channel supports fanout exchanges.
- property transport_options¶