Cas d’usage Nifi : pagination REST, Jolt et monitoring

Un cas d’usage Nifi

Contexte

Chez Cyrès, nous gérons la saisie des temps via un outil pratique et puissant : Toggl. Dans le cadre de l’amélioration du reporting, m’est venu l’idée d’intégrer les rapports CSV de cet outil au sein de notre plateforme interne Cloudera. L’objectif de ce cas d’usage Nifi étant de pouvoir faire gagner du temps aux responsables qui utilisent ces métriques pour le pilotage des activités.

Je vais vous présenter ce cas d’usage simple et riche en fonctionnalités. La pipeline d’ingestion a été réalisée avec Nifi, un ETL créé par la NSA et confié à la fondation Apache en 2014. Cette pipeline est découpée en trois étapes que nous allons couvrir dans cet article : la récupération du CSV, l’enregistrement en base avec gestion du « pagination » REST et surveillance de la pipeline. Le stockage est fait sur Impala et le rendu visuel est possible via HUE.

Attention, ce cas d’usage Nifi permet de mettre en avant des fonctionnalités, des patterns et une mise en pratique. Mais il existe des alternatives certainement simple pour réaliser le travail de fond.

Composants utilisée pour ce cas d’usage nifi

 

Récupération du CSV

L’élément déclencheur pour la pipeline est un simple GenerateFlowFile planifié tous les 1er du mois. Le contenu de celui-ci nous importe peu. En revanche, j’ajoute un attribut initial que j’ai nommé « counter » afin de suivre le « pagination » REST que j’aborderais plus bas.

Cas d'usage Nifi GenerateFlowFile
GenerateFlowFile initiateur de la pipeline

Le flowfile est transmis à un processeur de type ExecuteScript qui sert à compenser un besoin. Pour requêter sur le mois précédent, j’ai besoin de la date du premier jour du mois ainsi que du dernier jour. L’Expression Language de Nifi ne me permet pas facilement de récupérer ces éléments. Par conséquent, avec un script Python, j’ajoute directement les éléments en attributs.

Cas d'usage Nifi Python script
Script python permettant l’ajout d’attribut à la volé

Le challenge de l’API Toggl est situé sur le nombre d’élément retourné par requête : 50.
Au-delà, il faut requêter l’API avec un paramètre supplémentaire pour récupérer la « page » 2, puis 3, ainsi de suite tant qu’il y a du contenu. A cela s’ajoute une valeur intéressante : le nombre de record total pour la requête donnée.

Voici le workflow pour récupérer les éléments :

Cas d'usage Nifi Global Workflow
Workflow global de requête API

Voyons en détail les opérations effectuées :

  1. Requête sur l’API Toggl avec comme paramètre page = 1 et les dates comme bornes temporelle. Voici l’url du processor InvokeHttp :
    Cas d'usage Nifi InvokeHttp URI
    Url du processor InvokeHttp
  2. Récupération du nombre total d’enregistrement via un EvaluateJsonPath
  3. Routage en fonction de deux éléments :
    • Le nombre d’élément est inférieur à 50, alors on route sur « stopped » et le flowfile est directement descendu dans l’output
    • Le nombre d’élément est supérieur à 50 fois la variable « counter », alors on route sur Next. Cela renvoie le flowfile en bas dans output pour aller vers l’enregistrement et on duplique (route) vers un UpdateAttribute afin d’incrémenter la variable « counter »
    • Puis on re requête l’API avec la nouvelle valeur du compteur et ainsi de suite jusqu’à avoir 50 x counter plus grand que le nombre de record et arrêter la boucle.

A la fin de la boucle, j’aurais transmis à l’output l’ensemble des fichiers CSV comportant tous les records de temps du mois, BINGO !

Sauvegarde du contenu

Nifi SaveData
Workflow de sauvegarde des données

L’output de la partie de récupération est ensuite transmis à un ensemble de processeurs qui s’occupe d’enregistrer le contenu dans une table Impala. Pour ce faire je commence par récupérer un sous élément de mon JSON.

La clef « data » contient la liste des records de temps. Je la récupère avec un EvaluateJsonPath. Ce flowfile est envoyé vers un JoltTransformRecord afin de procéder à un renommage des champs de l’ensemble de mes records. Cela permet une insertion directe dans Impala via un PutDatabaseRecord et ainsi rester sur un travail au niveau Record et non unitaire. Le JoltTransformRecord utilise un JsonTreeReader configuré par défaut et un JsonRecordSetWriter lui aussi configuré par défaut.

Pour plus d’info sur pourquoi j’utilise des RecordProcessor, je vous renvoie au super blog de Pierre VILLARD : https://pierrevillard.com/tag/record/ 

Voici la spec Jolt que j’utilise. Elle permet de renommer certains champs comme définis lors de la création de ma table Impala :

Cas d'usage Nifi JoltSpec
Spécification Jolt

Et voici la définition de ma table Impala :

Cas d'usage Nifi CreateTableImpala
Requête SQL de création de la table Impala

Pour l’insertion dans Impala, j’utilise un DBCPConnectionPool, configuré de la sorte :

  • Database url :  jdbc:impala://${impala_uri}:21050/default;AuthMech=1;KrbRealm=${realm};KrbHostFQDN=${impala_uri};KrbServiceName=impala
  • Database driver : com.cloudera.impala.jdbc.Driver
  • Driver location : /opt/nifi/ImpalaJDBC42.jar

Pour le driver Impala JDBC, je l’ai téléchargé depuis la page web Cloudera puis unzip dans /opt/nifi en donnant les droits à l’utilisateur nifi dessus. Le processeur utilise un ControllerService Kerberos configuré à plus haut niveau dans Nifi, avec mon keytab (pré-déployé sur l’infrastructure) et mon principal.

On peut imaginer la suite d’un tel cas d’usage Nifi, en utilisant un outil de Datavisualisation pour faire un rapport pertinent et graphiquement plus avenant qu’un output de SELECT SQL.

Si j’avais été dans la partie Public Cloud de Cloudera j’aurais pu utilisé leur outil intégré. Sinon un outil tel que Tableau peut très bien fonctionner.

👉 Envie d’en savoir plus ? Je vous explique ce qu’il en ai aujourd’hui dans cet article consacré à Cloudera Data Plateform ?

Monitoring des flows

Pour m’assurer chaque mois que l’ingestion est bien faite, je suis parti sur un simple processeur PutSlack afin de notifier une channel du nombre d’occurrence insérés en base. Avant d’envoyer la notification, je procède à un MergeRecord afin de n’avoir qu’un flowfile et donc une notification pour l’ensemble des flowfiles récupéré via le système de « pagination » REST. Le MergeRecord utilise un JsonTreeReader configuré par défaut et un JsonRecordSetWriter lui aussi configuré par défaut.

Je finis ma pipeline par un LogAttribute qui se charge de loguer d’éventuelles erreur de notifications Slack.

Cas d'usage Nifi MonitoringWorkflow
Workflow de monitoring de la pipeline

J’aurais pu aller plus loin dans le monitoring / logging, Notamment en redirigeant plus les flux d’erreur vers des systèmes de type PutEmail / PutSlack et LogAttribute.

 

Conclusion

Ce cas d’usage Nifi met en avant l’utilisation conjointe des RecordProcessors, d’une transformation Jolt simple, d’une insertion dans Impala sur un environnement Kerberisé et d’un alerting mensuel.

La finalité reste assez simple. Si on transpose cela avec des flux de données temps réel, le gain est réel et permet, sans trop de code, d’ingérer des données variées dans Impala.

Si vous avez des questions ou souhaitez des précisions sur ce workflow ou sur ce cas d’usage nifi, n’hésitez pas à me contacter !