nuxeo-importer-core
contains several sample codes that can be adapted to run imports taking advantage of:
- thread-pooling
- batching (import several documents inside a given transaction)
- event processing filtering (enable bulk mode or skip some events)
This is the most efficient solution to run very fast imports.
However, the default implementation used to come with some limitations and constraints :
- extending the importer is done in Java
- this can be an issue for non Java developers
- multi-threading policy can be complex
- multi-threading policy depends on the source layout and dependencies between entries
- if import fails in the middle, then it must be restarted
The work on queue based importer and Kafka aims at addressing these limitations.
We want the importer infrastructure to promote a clear separation between the 2 sides of the import process :
- Reader / Producer : the one reading the input data (from files, DB ...)
- Write / Consumer : the one writing the data into Nuxeo Repository
By decoupling the Reader and Writer, we have several gains :
- we can get the Writer/Consumer part very generic
- have a highly optimized importer engine
- we can run separately the producer and the consumer
- this means we can more easily re-run the import without being forced to re-run all the pre-processing
- developers "working on the import process" have mainly to work on the Reader/Producer part
- this part being mainly decoupled from Nuxeo, they do not have to be Nuxeo developers
In order to have this decoupling, the idea is to add a queue between the 2 parts of the importer:
Source data => Producer => Queue(s) => Consumer => Import Data in Nuxeo
This is a new implementation of the importer: nuxeo-importer-queues. We clearly split the importer flow in 2 sub parts and have the queue system externalizable.
- Import part 1
- read the data from the source
- build an import message (can include some transformation)
- en-queue the message
- Import part 2
- read the message from the queue
- create a document inside the repository based on the message
The queue in the middle also allows us to completly decouple the threading model between the 2 parts :
- part 1 can be mono-threaded if this simpler (since this is usually not the bottleneck)
- part 2 is by default multi-threaded and batched to increase performances
The queuing system can be proivided by different backend, the current nuxeo-importer-queues
currently supports 2 backends :
- ChronicalQueues
- in JVM but easy to setup
- Apache Kafka
- distributed MOM
Kafka may be a little bit more complex to deploy but in exchange of the additional effort you need to do for the setup it does provide some additional benefits:
- you can scale the queue between several servers
- this means that you can run import on different Nuxeo Server nodes
- you are not limited by available memory
- Kafka queues on disk
- you can write the client part in any language supported by Kafka
- Java, JavaScript, Python, .Net, Ruby ...
More than the Java API, the real interface you need to implement on the producer side is the message format.
Importer requires you to use the specific formatted messages. You have to use Apache Avro to successfully serialize the message to the required format. We provide Avro template, that should be used for message creaton.
The format tends to be generic for the most use cases.
Default Avro message format:
{
"namespace": "org.nuxeo.ecm.platform.importer.kafka.avro",
"type": "record",
"name": "Message",
"fields": [
{ "name": "title", "type": "string" },
{ "name": "path", "type": "string" },
{ "name": "folderish", "type": "boolean" },
{ "name": "properties", "type": { "type": "map", "values": "string" }},
{ "name": "parent", "type": "string" },
{ "name": "type", "type": "string" }
]
}
The most valuable part is the properties field. It can be field with any custom fields you have in your document.
- Java developers have advantage of using command line tools for creating Java classes, that will be used for the further object creation
- Python and JS developers have to create JSON objects, that conform to the reuired format. Avro libraries have tools that will check if the message suits to the template. Messages that do not conform will not be imported.
Niether Chronicle Queue nor Apache Kafka are suitable for handling binaries. You should consider each message not more that 30KB. Binaries should be pre-imported to the Binary Manager used on your instances and hash codes added to the the appropriate messages. The Importer expects to find an empty Blob, the blob shpuldn't contain any binaries, only the hash string, that represents an actual binary in your storage.
- To use Kafka you required to create a marketplace package, that deploys the DefaultKafkaComponent. The system expects to find the org.nuxeo.ecm.platform.importer.kafka.service.DefaultKafkaComponent contribution. For more details about KafkaConsumer and KafkaProducer configuration, read the official documentation.
- Below you will find an example of required fields.
<extension target="org.nuxeo.ecm.platform.importer.kafka.service.DefaultKafkaComponent" point="kafkaConfiguration">
<kafkaConfig
bootstrapServer="127.0.0.1:9092">
<topics>
<topic>test</topic>
</topics>
<producerConfigs>
<property name="bootstrap.servers">localhost:9092</property>
<property name="acks">all</property>
<property name="retries">0</property>
<property name="batch.size">4194304</property>
<property name="linger.ms">0</property>
<property name="max.block.ms">1000</property>
<property name="compression.type">none</property>
<property name="key.serializer">org.apache.kafka.common.serialization.StringSerializer</property>
<property name="value.serializer">org.apache.kafka.common.serialization.StringSerializer</property>
</producerConfigs>
<consumerConfigs>
<property name="bootstrap.servers">localhost:9092</property>
<property name="group.id">testGroup</property>
<property name="enable.auto.commit">true</property>
<property name="auto.offset.reset">earliest</property>
<property name="auto.commit.interval.ms">1000</property>
<property name="heartbeat.interval.ms">3000</property>
<property name="session.timeout.ms">10000</property>
<property name="request.timeout.ms">15000</property>
<property name="max.partition.fetch.bytes">30720</property>
<property name="max.poll.records">100</property>
<property name="key.deserializer">org.apache.kafka.common.serialization.StringDeserializer</property>
<property name="value.deserializer">org.apache.kafka.common.serialization.StringDeserializer</property>
</consumerConfigs>
</kafkaConfig>
</extension>
- Any additional fields can be added but you should be aware that the configurations are very sensative.
- Pay attention to the
session.timeout.ms
. Time of the conusmer's operation should be known at the planning stage.session.timeout.ms
must be less thanrequest.timeout.ms
. max.poll.records
is thebatch.size
configuration on the Nuxeo side. You have to keep these numbers equal, otherwise the importer can produce duplicate or lost some messages at the rollback step.
- Required Kafka 0.10.1.0
- Please follow the official documetnation to deploy Kafka
- Please follow the official documentation to set up the broker side.
- Below is an example of the most sensative configurations recomended to use for your own instance.
# The number of threads handling network requests
num.network.threads=4
# The number of threads doing disk I/O
num.io.threads=4
# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=5194304
# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=5194304
# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=519430400
# Enable topic deletion
delete.topic.enable=true
############################# Log Basics #############################
# A comma seperated list of directories under which to store log files
log.dirs=/var/kafka
auto.create.topics.enable=false
# The number of logical partitions per topic per server. More partitions allow greater parallelism
# for consumption, but also mean more files.
num.partitions=20
#max fetch size
replica.fetch.max.bytes=5242880
# max size of message recieved from producer per topic
message.max.bytes=262144
# compression
compression.type=gzip
- Please note, that
replica.fetch.max.bytes
ensures the broker, that the consumer won't grab more than expected. Whenmessage.max.bytes
ensures, that the broker won't accept a message bigger than the size. So, please configure your consumers and producers accoridngly.
- Kafka is written in Scala and Java. Although, it has an extensive amount of available clients
XXX
- We recomned to use virtual environment for Python, in the example we are using Python 3. Following code should work fine with Python 2 as well.
- You need to install
kafka-python
using command line typepip install kafka-python
.
First you need to create KafkaProducer
, where you pass the Kafka server address and the port, by default it is 9092
.
from kafka import KafkaProducer
producer = KafkaProducer(
bootstrap_servers='localhost:9092'
)
In this example we will use level-by-level import. It allows us to read from Kafka the topic and import messages in parallel with respect to their level in your document hierarchy.
# buf is an avro serialied message.
def send(buf, level, producer):
lv = 'level_' + str(level)
future = producer.send(
topic=lv,
value=buf.getvalue(),
key=bytes('msg', 'utf-8')
)
# await async send
try:
metadata = future.get(timeout=1000)
print(metadata)
except KafkaError:
print("Couldn't send message to level " + str(lv))
For more information about kafka-python, please read the following documentation.
- threads
- The best performance can be achieved by using not more than two threads by available core.
- Note that producers and consumers can be run within the same system.
- No
- document factory