Added queue to handle sending of subscribers notifications

This commit is contained in:
thnilsen 2020-06-16 20:11:25 +02:00
parent 62e6f08e4f
commit e3d8e3ecf8
5 changed files with 351 additions and 64 deletions

View File

@ -18,6 +18,9 @@ if (isset($_GET['delete']))
if (isset($_GET['tasks'])) {
Template::render_header(_("Dashboard"), true);

View File

@ -40,7 +40,6 @@ class Mailer {
$elements = explode('@', $to);
$domainpart = EncodePunycodeIDN(array_pop($elements)); // Convert domain part to ascii
$to = $elements[0] . '@' . $domainpart; // Reassemble tge full email address
syslog(1,"email: " .$to);
// Send using PHP mailer if it is enabled

View File

@ -1,5 +1,6 @@
require_once(__DIR__ . "/queue.php");
* Class that encapsulates everything that can be done with notifications
@ -54,11 +55,69 @@ class Notification
global $mysqli;
// Fetch list of unique subscribers for given service
// Direct inclusion of variable withour using prepare justified by the fact that
// Direct inclusion of variable without using prepare justified by the fact that
// this->serviceids are not user submitted
$sql = "SELECT DISTINCT subscriberIDFK FROM services_subscriber WHERE serviceIDFK IN (" . $this->serviceids . ")";
$query = $mysqli->query($sql);
// Create the queue tasks for email/telegram notifications
$queue = new Queue();
$queue->status = $queue->all_status['populating'];
$queue->user_id = $_SESSION['user'];
$arr_data = array();
$arr_data = $this->prepare_email(); // Make up the base message and subject for email
$queue->type_id = $queue->all_type_id['notify_email'];
$queue->template_data1 = $arr_data['subject'];
$queue->template_data2 = $arr_data['body'];
$task_id_email = $queue->add_task();
syslog(1, "queue email: ". $task_id_email);
$arr_email = array();
$arr_data = $this->prepare_telegram();
$queue->type_id = $queue->all_type_id['notify_telegram'];
$queue->template_data1 = null;
$queue->template_data2 = $arr_data['body'];
$task_id_telegram = $queue->add_task();
syslog(1, "queue telegram: ". $task_id_telegram);
$arr_telegram = array();
while ($subscriber = $query->fetch_assoc()) {
// Fetch list of subscriber details for already found subscriber IDs
$stmt = $mysqli->prepare("SELECT typeID FROM subscribers WHERE subscriberID = ? AND active=1");
$stmt->bind_param("i", $subscriber['subscriberIDFK']);
$subscriberQuery = $stmt->get_result();
while ($subscriberData = $subscriberQuery->fetch_assoc()) {
$typeID = $subscriberData['typeID']; // Telegram = 1, email = 2
// Handle telegram
if ($typeID == 1 && SUBSCRIBE_TELEGRAM) {
$arr_telegram[] = $subscriber['subscriberIDFK'];
// Handle email
if ($typeID == 2 && SUBSCRIBE_EMAIL) {
$arr_email[] = $subscriber['subscriberIDFK'];
$queue->task_id = $task_id_telegram;
$queue->add_notification($arr_telegram); // Add array of Telegram users to the notification queue list
$queue->task_id = $task_id_email;
$queue->add_notification($arr_email); // Add array of Email users to the notification queue list
/* OLD CODE to get user email/telegram data) - Move to queue handler...
while ($subscriber = $query->fetch_assoc()) {
// Fetch list of subscriber details for already found subscriber IDs
$stmt = $mysqli->prepare("SELECT typeID, userID, firstname, token FROM subscribers WHERE subscriberID = ? AND active=1");
@ -82,27 +141,33 @@ class Notification
$this->submit_email($userID, $token);
* Sends Telegram notification message using their web api.
* @param string $userID The Telegram userid to send to
* @param string $firstname The users firstname
* @return boolean
* @param string $msg Body of message
* @return boolean true = Sent / False = failed
public function submit_telegram($userID, $firstname)
public function submit_queue_telegram($userID, $firstname, $msg)
// TODO Handle limitations (Max 30 different subscribers per second)
// TODO Error handling
$msg = _("Hi %s!\nThere is a status update for service(s): %s\nThe new status is: %s\nTitle: %s\n\n%s\n\n<a href='%s'>View online</a>");
$msg = sprintf($msg, $firstname, $this->servicenames, $this->status, $this->title, $this->text, WEB_URL);
$msg = sprintf($msg, $firstname);
$tg_message = urlencode($msg);
$response = json_decode(file_get_contents("" . TG_BOT_API_TOKEN . "/sendMessage?chat_id=" . $userID . "&parse_mode=HTML&text=" . $tg_message), true);
if (! array_key_exists("ok", $response) || $response['ok'] != 1 ) {
$tg_message = array('text' => $msg, 'chat_id' => $userID, 'parse_mode' => 'HTML');
$json = @file_get_contents("" . TG_BOT_API_TOKEN . "/sendMessage?" . http_build_query($tg_message) );
$response = json_decode($json, true);
if (!is_array($response) || ! array_key_exists("ok", $response) || $response['ok'] != 1 ) {
syslog(1, "telegram failed: ".$userID);
return false;
syslog(1,"telegram ok: " .$userID);
return true;
@ -113,12 +178,58 @@ class Notification
* @param String $uthkey Users token for managing subscription
* @return void
public function submit_email($userID, $token)
public function submit_queue_email($subscriber, $subject, $msg)
// TODO Error handling
$Parsedown = new Parsedown();
//$Parsedown = new Parsedown();
$mailer = new Mailer();
if ( ! $mailer->send_mail($subscriber, $subject, $msg, true) ) {
syslog(1, "email failed: " .$subscriber);
return false;
syslog(1, "email ok: " .$subscriber);
return true;
// /**
// * Sends email notifications to a subscriber.
// * Function depends on Parsedown and Mailer class being loaded.
// * @param String $userID The email address to send to
// * @param String $uthkey Users token for managing subscription
// * @return void
// */
// public function submit_email_old($userID, $token)
// {
// // TODO Error handling
// //$Parsedown = new Parsedown();
// $mailer = new Mailer();
// $str_mail = file_get_contents("../libs/templates/email_status_update.html");
// $str_mail = str_replace("%name%", NAME, $str_mail);
// // $smtp_mail = str_replace("%email%", $userID, $smtp_mail);
// $str_mail = str_replace("%url%", WEB_URL, $str_mail);
// $str_mail = str_replace("%service%", $this->servicenames, $str_mail);
// $str_mail = str_replace("%status%", $this->status, $str_mail);
// $str_mail = str_replace("%time%", date("c", $this->time), $str_mail);
// $str_mail = str_replace("%comment%", $Parsedown->setBreaksEnabled(true)->text($this->text), $str_mail);
// $str_mail = str_replace("%token%", $token, $str_mail);
// $str_mail = str_replace("%service_status_update_from%", _("Service status update from"), $str_mail);
// $str_mail = str_replace("%services_impacted%", _("Service(s) Impacted"), $str_mail);
// $str_mail = str_replace("%status_label%", _("Status"), $str_mail);
// $str_mail = str_replace("%time_label%", _("Time"), $str_mail);
// $str_mail = str_replace("%manage_subscription%", _("Manage subscription"), $str_mail);
// $str_mail = str_replace("%unsubscribe%", _("Unsubscribe"), $str_mail);
// $str_mail = str_replace("%powered_by%", _("Powered by"), $str_mail);
// $subject = _('Status update from') . ' - ' . NAME . ' [ ' . $this->status . ' ]';
// $mailer->send_mail($userID, $subject, $str_mail);
// }
public function prepare_email(){
$Parsedown = new Parsedown();
$str_mail = file_get_contents("../libs/templates/email_status_update.html");
$str_mail = str_replace("%name%", NAME, $str_mail);
// $smtp_mail = str_replace("%email%", $userID, $smtp_mail);
@ -127,7 +238,7 @@ class Notification
$str_mail = str_replace("%status%", $this->status, $str_mail);
$str_mail = str_replace("%time%", date("c", $this->time), $str_mail);
$str_mail = str_replace("%comment%", $Parsedown->setBreaksEnabled(true)->text($this->text), $str_mail);
$str_mail = str_replace("%token%", $token, $str_mail);
//$str_mail = str_replace("%token%", $token, $str_mail);
$str_mail = str_replace("%service_status_update_from%", _("Service status update from"), $str_mail);
$str_mail = str_replace("%services_impacted%", _("Service(s) Impacted"), $str_mail);
@ -136,7 +247,18 @@ class Notification
$str_mail = str_replace("%manage_subscription%", _("Manage subscription"), $str_mail);
$str_mail = str_replace("%unsubscribe%", _("Unsubscribe"), $str_mail);
$str_mail = str_replace("%powered_by%", _("Powered by"), $str_mail);
$subject = _('Status update from') . ' - ' . NAME . ' [ ' . $this->status . ' ]';
$mailer->send_mail($userID, $subject, $str_mail);
$val = array();
$val['subject'] = $subject;
$val['body'] = $str_mail;
return $val;
public function prepare_telegram(){
$msg = _("Hi #s!\nThere is a status update for service(s): %s\nThe new status is: %s\nTitle: %s\n\n%s\n\n<a href='%s'>View online</a>");
$val['body'] = sprintf($msg, $this->servicenames, $this->status, $this->title, $this->text, WEB_URL);
return $val;

classes/queue.php Normal file
View File

@ -0,0 +1,163 @@
* Class for creating and managing the queue system
class Queue
public $task_id;
public $type_id;
public $status;
public $template_data1; // i.e. Subject for email
public $template_data2; // i.e. HTML email body
public $create_time;
public $completed_time;
public $num_errors;
public $user_id;
public $all_type_id = array('notify_telegram' => 1,
'notify_email' => 2);
public $all_status = array('populating' => 1,
'ready' => 2,
'processing' => 3,
'completed' => 4,
'failed' => 5);
public function add_task() {
global $mysqli;
$stmt = $mysqli->prepare("INSERT INTO queue_task (type_id, status, template_data1, template_data2, created_time, user_id) VALUES (?,?,?,?,?,?)");
if ( false===$stmt ) {
//die('prepare() failed: ' . htmlspecialchars($mysqli->error));
echo $mysqli->errno();
#if ( false === $stmt ) { syslog(1, "Error :". $mysqli->error); }
$now = time();
$res = $stmt->bind_param("iissii", $this->type_id, $this->status, $this->template_data1, $this->template_data2, $now, $this->user_id);
if ( false === $res ) {
echo "error";
$query = $stmt->get_result();
print $query;
$this->task_id = $mysqli->insert_id;
return $this->task_id;
* Remove task from the queue
* @return void
public function delete_task($task_id){
global $mysqli;
$stmt = $mysqli->prepare("DELETE FROM queue_task WHERE id = ?");
$stmt->bind_param("i", $task_id);
* Update status for given task
* @param int $new_status The new current status of the task. Must be selected from the $all_status array.
* @return void
public function set_task_status($new_status) {
global $mysqli;
$stmt = $mysqli->prepare("UPDATE queue_task SET status = ? WHERE id = ?");
$stmt->bind_param("ii", $new_status, $this->task_id);
$this->status = $new_status;
* Add notification queue data for given task
* @param array $arr_data Array filled with subscriber_id
* @return void
public function add_notification($arr_data) {
global $mysqli;
//Default status = 1, retres = 0, task_id = $this->task_id
// Build query manually since mysqli doesn't cater well for multi insert..
$count = count($arr_data); // Let's find number of elements
$counter = 0;
$query = '';
$seperator = ',';
$sub_query = '(%d, %d, %d ,%d)%s';
foreach ($arr_data as $value) {
if ($counter == $count) { $seperator = ''; } // Make sure last character for SQL query is correct
$query .= sprintf($sub_query, $this->task_id, 1, $value, 0, $seperator);
$sql = "INSERT INTO queue_notify (task_id, status, subscriber_id, retries) VALUES ". $query;
$this->set_task_status($this->all_status['ready']); // Make task available for release
public function update_notfication_retries($task_id, $subscriber_id) {
global $mysqli;
$stmt = $mysqli->prepare("UPDATE queue_notify SET retries = retries+1 WHERE task_id = ? AND subscriber_id = ?");
$stmt->bind_param("ii", $task_id, $subscriber_id);
public function delete_notification($task_id, $subscriber_id) {
global $mysqli;
$stmt = $mysqli->prepare("DELETE FROM queue_notify WHERE task_id = ? AND subscriber_id = ?");
$stmt->bind_param("ii", $task_id, $subscriber_id);
// TODO: Fix max attempts for notifications
public function process_queue(){
global $mysqli;
$stmt = $mysqli->query("SELECT, qn.task_id, qn.status, qn.subscriber_id, qn.retries, sub.firstname, sub.userID, sub.token FROM queue_notify AS qn INNER JOIN subscribers AS sub ON qn.subscriber_id = sub.subscriberID WHERE qn.status NOT LIKE 2 AND");
while ( $result = $stmt->fetch_assoc() ) {
$i = 2;
$stmt2 = $mysqli->prepare("SELECT * FROM queue_task WHERE id = ? AND status = ?");
$stmt2->bind_param("ii", $result['task_id'], $i);
$tmp = $stmt2->get_result();
$result2 = $tmp->fetch_assoc();
$typeID = $result2['type_id'];
syslog(1, $typeID . " " . $result['task_id'] . " " . $result['userID']);
// Handle telegram
if ($typeID == 1) {
$msg = str_replace("#s", $result['firstname'], $result2['template_data2']);
if ( ! Notification::submit_queue_telegram($result['userID'], $result['firstname'], $msg) ) {
Queue::update_notfication_retries($result['task_id'], $result['subscriber_id']); // Sent
} else {
Queue::delete_notification($result['task_id'], $result['subscriber_id']); // Failed
// Handle email
if ($typeID == 2) {
$msg = str_replace("%token%", $result['token'], $result2['template_data2']);
if ( ! Notification::submit_queue_email($result['userID'], $result2['template_data1'], $msg) ) {
Queue::update_notification_retries($result['task_id'], $result['subscriber_id']); // Sent
} else {
Queue::delete_notification($result['task_id'], $result['subscriber_id']); // Failed
// Check if queue log is empty and if so delete the queue_task
$stmt = $mysqli->query("SELECT id, (SELECT COUNT(*) FROM queue_notify AS qn WHERE qn.task_id = AS count FROM queue_task");
while ( $result = $stmt->fetch_assoc() ) {
if ( $result['count'] == 0 ) {