Back to Blog Posts

Raspberry Pi Home Irrigation - Part 3: Application

Blog

Monday, 24 April 2023

This is the third post in a series that covers a project where the aim is to create a home irrigation system using a Raspberry Pi that waters the plants in the garden, based on the weather.

This post goes over a few areas that make up the application within the project; the weather checking, job scheduling, notification sending, duplicate job preventing and 'global' progress bar.

The project divides nicely into different sections:


Overview:


There are several parts to this section, each to break down an area of the diagram below from Part 1.

Overview

Flask



The project has been created in Flask to keep things nice and light. This also makes it easy to take advantage of an abundance of useful packages, such as flask-redis.

Within this project, the same Flask application is run twice, once as a frontend and once as a Celery worker. This can be done on the same machine or separate machines (as is my case) so long as the two can communicate with each other and the other supporting infrastructure.

Frontend


The frontend serves as a UI for a user to manage weather/location settings and watering schedules. It also tracks the progress of a current watering job and allows the user to request ad-hoc watering from the dashboard.


# /etc/supervisor/conf.d/smart-irrigation.conf

./venv/bin/flask run


The main dashboard of the application shows the current weather and the previous/upcoming rain and watering occurances for their location. The user also has an option to start watering now from a button. Dashboard


Users can create Settings Profiles which contain individual configuration about watering schedule preferences. Settings


A Settings Profile contains the Telegram notification channel data, Location and Weather API information, and finally the watering schedule configuration. Settings Profile


Users have a profile page simply to set their timezone along with a preferred Settings Profile to appear on their dashboard. Profile


Backend


The API instance of this project relies on a celery worker, responsible for executing jobs.


# /etc/supervisor/conf.d/smart-irrigation.conf

./venv/bin/celery -A app.celery worker -B

The main database table in the backend is the Settings table which stores all of the watering configuration options. These options are read by a recurring job that will open the solenoid valve based on the set time and duration.


# app/models.py

from sqlalchemy import Column
from sqlalchemy.types import Integer, String
from sqlalchemy.orm import relationship

from app import app

# ..

class Settings(db.Model):

    __tablename__ = "Settings"

    id = Column(Integer, primary_key=True)
    name = Column(String(32))
    description = Column(String(128))

    is_primary = relationship("User")

    watering_start_at = Column(String(32), default="14:00")
    water_duration_minutes = Column(String(32), default="60")

    latitude = Column(String(32))
    longitude = Column(String(32))
    owm_apikey = Column(String(128))

    city = Column(String(32))
    country = Column(String(32))
    timezone = Column(String(128))

    schedule_watering = Column(String(32), default="eod")

    skip_rained_today = Column(String(32), default="1")
    skip_rained_yesterday = Column(String(32), default="1")
    skip_watered_today = Column(String(32), default="1")
    skip_watered_yesterday = Column(String(32), default="1")

    telegram_token = Column(String(128))
    telegram_chat_id = Column(String(128))

    def __repr__(self):
        return str(self.name)


Celery & Celery Once


Celery is used in this project as the task queue. This allows tasks to be executed by the Celery worker which runs on the Raspberry Pi connected to the solenoid valve. To facilitate this, Redis is used as the message broker between Flask and Celery.

Both machines must be able to communicate with, and be configured to use the same Redis endpoint, as per environment variables below used in config.py:


# .env

CELERY_BROKER_URL = "redis://10.10.10.11:6379/1"
CELERY_RESULT_BACKEND = "redis://10.10.10.11:6379/1"


# config.py

# ..

    CELERY_CONFIG = {
        "broker_url": os.environ.get(
            "CELERY_BROKER_URL"
        ),
        "result_backend": os.environ.get(
            "CELERY_RESULT_BACKEND"
        ),
        "timezone": TIMEZONE,
        "imports": (
            'app.watering',
        ),
    }

There are two configured tasks in this project:

  • check_water_schedule - periodically review all stored watering configurations for any matching the current time. This is executed every minute.
  • water_plants - Open and close the valve as required, and send notifications. This one is executed on demand.

The water_plants task is executed from within the check_water_schedule task, or it can be executed by a user from the dashboard via a button.

To cut down on potential bugs that may leave water running any longer than desired, I looked for a way to prevent multiple water_plants tasks running at any given time.

A simple solution to prevent duplicate Celery tasks being executed or queued is to make use of a project that does exactly that - Celery Once.

The Celery configuration is initialised below as per the Flask documentation and Celery Once Flask Integration readme. First, Celery is updated with the configuration outlined above, then the same for Celery Once. Following that, a subclass of the Task and QueueOnce is created making them available elsewhere within the project with application context.


# app/__init__.py

from celery import Celery
from celery_once import QueueOnce

# ..

def make_celery(app):

    celery = Celery(
        app.import_name,
    )
    celery.conf.update(app.config.get("CELERY_CONFIG", {}))

    celery.conf.ONCE = {
        'backend': 'celery_once.backends.Redis',
        'settings': {
            'url': app.config['CELERY_BROKER_URL'],
            'default_timeout': 60 * 30
        }
    }

    class ContextTask(celery.Task):
        def __call__(self, *args, **kwargs):
            with app.app_context():
                return self.run(*args, **kwargs)

    celery.Task = ContextTask

    class ContextQueueOnce(QueueOnce):
        def __call__(self, *args, **kwargs):
            with app.app_context():
                return super(ContextQueueOnce, self).__call__(*args, **kwargs)

    celery.QueueOnce = ContextQueueOnce

    return celery


celery = make_celery(app)

Once celery has been configured, a schedule is created that runs every minute to check for watering configurations stored in the database. Calling crontab() with no parameters defaults to every minute.


# app/__init__.py

from celery import Celery
from celery.schedules import crontab

# ..

celery.conf.beat_schedule = {

    # Check watering schedule every minute
    'check_water_schedule': {
        'task': 'app.watering.check_water_schedule',
        'schedule': crontab(),
    },
}


SocketIO


The frontend uses Flask-SocketIO so the UI and server communicate via WebSocket.


# smart-irrigation.py

from app import app, socketio

if __name__ == "__main__":
    socketio.run(app)

One use case of this is so that the frontend can display the connection status to a Celery worker. See the bottom left of each UI screenshot above.

To achieve this, a socketio route is configured with a namespace called /water awaiting a connection event. A namespace is used as a communication channel to separate and organise logic, the events in this case act as handlers, similarly to functions in Flask views.

In this route below - when a connection event is recieved in the /water namespace, the availability of the Celery worker is checked. If the connection is successful then a confirmation event is emitted back to the namespace.


# app/routes.py

from flask_socketio import emit

from app import app, socketio
from app.util import get_celery_worker_status

# ..

@socketio.on("connection", namespace="/water")
def confirmation_message(message):

    celery_worker_status = get_celery_worker_status()

    if celery_worker_status["availability"] is not None:

        emit("confirmation", {
            "connection_confirmation": message["connection_confirmation"]
        })

Below is the Javascript used in the template for the sidebar menu in the frontend.

On page load, a connection event is sent to the defined /water namespace, triggering the logic above.

If a confirmation event is recieved, an icon on the web page is modified to highlight a successful connection to the Celery worker so the user knows they can start watering.


// app/templates/includes/sidebar.html

    <script src="/static/assets/js/jquery.min.js"></script>
    <script src="//cdnjs.cloudflare.com/ajax/libs/socket.io/4.5.1/socket.io.min.js"></script>

    <script>
        $(document).ready(function () {

            namespace = '/water';
            var socket = io.connect('http://' + document.domain + ':' + location.port + namespace);

            socket.on('connect', function () {
                socket.emit('connection', { connection_confirmation: 'Irrigation Connected' });

            });

            socket.on('confirmation', function (data) {
                var confir_str = data.connection_confirmation;
                connection_status = '<span class="icon w-20 icon-xs icon-success mr-1"><span class="fas fa-plug"></span></span>' + confir_str
                document.getElementById("socketStatusIcon").classList.remove('icon-danger');
                document.getElementById("socketStatusIcon").classList.add('icon-success');
                document.getElementById("socketStatusText").innerHTML = confir_str
            });
        });
    </script>


Location API



The Location API is used to resolve the profiles latitude and longitude to a City/Country which is used for the Weather API. This approach maintains accuracy and reduces the chances of typos or mismatched locations (such as Birmingham, UK and Birmingham, AL)

To achieve this, I'm using data from the OpenStreetMap (OSM) dataset. This data is indexed by Nominatim which is available from the GeoPy library.

This will return a location from a reverse lookup of the latitude and longitude.

For example, 51.5007, -0.1246:

{
  "place_id": 129302133,
  "licence": "Data © OpenStreetMap contributors, ODbL 1.0. https://osm.org/copyright",
  "osm_type": "way",
  "osm_id": 123456789,
  "lat": "51.500690000000006",
  "lon": "-0.12458391796350343",
  "display_name": "Big Ben, Officer's Corridor, Westminster, Lambeth, City of Westminster, Greater London, England, SW1A 0PW, United Kingdom",
  "address": {
    "tourism": "Big Ben",
    "road": "Officer's Corridor",
    "quarter": "Westminster",
    "suburb": "Lambeth",
    "city": "City of Westminster",
    "ISO3166-2-lvl8": "GB-WSM",
    "county": "Greater London",
    "state": "England",
    "ISO3166-2-lvl4": "GB-ENG",
    "postcode": "SW1A 0PW",
    "country": "United Kingdom",
    "country_code": "gb"
  },
  "boundingbox": [
    "51.5006231",
    "51.5007567",
    "-0.1246941",
    "-0.1244737"
  ]
}

# app/location.py

from geopy.geocoders import Nominatim


def get_city_country(latitude, longitude):

    geolocator = Nominatim(user_agent="SmartIrrigation", scheme='https')

    lat_long = "% s, % s" % (str(latitude), str(longitude))
    location = geolocator.reverse(lat_long, timeout=3)

    return location.raw

When the user submits their Settings Profile form, the request will be sent to the function below to process, then appropriately create or update an existing profile.

During this processing, the function above is called to resolve the latitude and longitude.


# app/settings_profile.py

import json

from app import db
from app.models import Settings
from app.location import get_city_country


def create_or_update_settings_profile(request, name):

    # ..

    latitude = request.form.get("latitude")
    longitude = request.form.get("longitude")

    if latitude is not None and longitude is not None:

        location_info = json.dumps(
            get_city_country(latitude, longitude)
        )

        if "city" in json.loads(location_info)["address"]:

            city = json.loads(location_info)["address"]["city"]
        else:

            city = json.loads(location_info)["address"]["state"]

        country = json.loads(location_info)["address"]["country"]

    # ..

    new_settings_profile = Settings(

        # ..

        latitude=latitude,
        longitude=longitude,

        # ..

    )

    db.session.add(new_settings_profile)
    db.session.commit()


Weather API



To access weather information I'm using Open Weather Map (OWM), this service requires signing up to get an API key, fortunately the free subscription tier covers enough for my needs:

  • 60 calls/minute
  • 1,000,000 calls/month
  • Current Weather
  • 3-hour Forecast 5 days

More specifically, I'm using a Python library - PyOWM - to interact with OWM.

The settings_profile_data var contains all the data needed to initialise OWM and then process weather data.


# app/weather.py

import json
import pytz
from pyowm import OWM
from datetime import datetime, timedelta

from app import app
from app.models import Watering


def init_owm(settings_profile_data):

    owm = OWM(settings_profile_data["owm_apikey"])
    mgr = owm.weather_manager()

    return mgr

Users can define limits on when they want to allow watering, such as everyday so long as it didn't rain today or yesterday. This means we need to check weather historically to obtain this information.

The function below takes a users location along with a day_count variable to dictate how many days ago to retrieve the weather for. This function is sometimes called in a loop from 0 - 6 to represent the weather from today going back up to 6 days by hourly forecast to find out when it last rained.


# app/weather.py

def get_one_call_history(day_count, settings_profile_data):

    timestamp = get_previous_day_timestamp(day_count, settings_profile_data)

    mgr = init_owm(settings_profile_data)

    weather = mgr.one_call_history(
        dt=timestamp,
        lat=float(settings_profile_data["latitude"]),
        lon=float(settings_profile_data["longitude"])
    )

    return weather.forecast_hourly

A user can also skip watering if we know that it will rain later today, so this requires checking upcoming weather.


# app/weather.py

def get_next_rain_date(settings_profile_data):

    next_rain_date = None

    upcoming_weather = get_one_call_current(
        settings_profile_data
    )

    current_time = convert_local_utc(
        datetime.now().timestamp(),
        settings_profile_data
        )

    for hourly_weather in upcoming_weather:
        if hourly_weather.status.lower() == "rain":
            if current_time < hourly_weather.ref_time:

                next_rain_date = convert_local_utc(
                    hourly_weather.ref_time,
                    settings_profile_data
                )

                break

    if next_rain_date is None:

        return "over a week away"
    else:
        return datetime.fromtimestamp(next_rain_date)

Based on the current weather the dashboard can also be updated with the correct icon and weather forecast.


# app/weather.py

def get_current_weather(city, country):

    mgr = init_owm()

    observation = mgr.weather_at_place(city + "," + country)

    return observation.weather


def owm_icon_mapping(weather_code):

    with open('iconmapping.json') as f:
        icon_mapping_data = json.load(f)

        weather_icon = icon_mapping_data[str(weather_code)]["icon"]

    return weather_icon

The route below prepares the dashboard detail.


# app/routes.py

# ..

from app.dashboard import get_dashboard_data

# ..

@blueprint.route("/dashboard")
@login_required
def dashboard():

    active_icon = "dashboard"

    dashboard_data = get_dashboard_data()

    return render_template(
        "dashboard.html",
        dashboard_data=dashboard_data,
        segment=active_icon
    )

# ..


# app/dashboard.py

# ..

from app.weather import get_current_weather, owm_icon_mapping
from app.weather import get_last_rain_date, get_next_rain_date
from app.weather import get_last_water_date, get_next_water_date


def get_dashboard_data():

    if current_user.primary_profile_id is not None:

        # Try and generate dashboard data relevant to the
        # users preferred dashboard.

        settings_profile_data = Settings.query.filter_by(
            id=current_user.primary_profile_id
        ).first()

    elif Settings.query.first() is not None:

        # Otherwise just generate dashboard data for the
        # first settings profile configured in the db.

        settings_profile_data = Settings.query.first()

    else:

        # If there isn't any settings data configured yet
        # then return data to inform user of this.

        dashboard_data = {}

        current_date = datetime.now(
            pytz.timezone("Europe/London")
        )

        dashboard_data.update(
            {
                "weather_detail": "No Settings Profile Found",
                "weather_icon": "wi wi-na",
                "current_date": current_date,
                "last_rain_date": "API Key Required",
                "next_rain_date": "API Key Required",
                "last_water_date": "N/A",
                "last_water_duration": "N/A",
                "next_water_date": "API Key Required"
            }
        )

        return dashboard_data

    dashboard_data = generate_dashboard_data(settings_profile_data)

    return dashboard_data


def generate_dashboard_data(settings_profile_data):

    dashboard_data = {}
    settings_profile_data = {}

    try:

        current_weather = get_current_weather(
            settings_profile_data
        )

        weather_detail = current_weather.detailed_status

        current_date = datetime.now(
            pytz.timezone(settings_profile_data["timezone"])
        )

        if current_date.hour >= 6 and current_date.hour <= 20:

            prefix = "wi wi-day-"
        else:
            prefix = "wi wi-night-"

        weather_icon = prefix + owm_icon_mapping(
            current_weather.weather_code
        )

        if weather_icon == "wi wi-night-sunny":
            weather_icon = "wi wi-night-clear"

        last_rain_date = get_last_rain_date(
            settings_profile_data
        )

        next_rain_date = get_next_rain_date(
            settings_profile_data
        )

        last_water_date, last_water_duration = get_last_water_date()

        next_water_date = get_next_water_date(
            settings_profile_data
        )

        dashboard_data.update(
            {
                "weather_detail": weather_detail,
                "weather_icon": weather_icon,
                "city": settings_profile_data["city"],
                "current_date": current_date,
                "last_rain_date": last_rain_date,
                "next_rain_date": next_rain_date,
                "last_water_date": last_water_date,
                "last_water_duration": last_water_duration,
                "next_water_date": next_water_date
            }
        )

    # ..

    return dashboard_data


Notifications



Part of the Settings Profile data includes a Telegram API Token and an ID to represent a Chat. In my case it's just a group chat containing me and a bot.

Seeing as we can access the telegram variables from the settings profile, this is just passed to the function along with a string of text to send, below:


# app/notifications.py

import urllib
import requests
from app import app


def telegram_notify(notification, settings_profile_data):

    telegram_api_endpoint = \
        'https://api.telegram.org/bot%s/sendMessage?chat_id=%s&text=%s' % (
                settings_profile_data.telegram_token,
                settings_profile_data.telegram_chat_id,
                urllib.parse.quote_plus(str(notification))
        )

    response = requests.get(telegram_api_endpoint, timeout=10)
    app.logger.info(response.text)


The notifications are sent during the function that actually opens and closes the solenoid valve, there are a few values in settings_profile_data that are needed for watering so it's simple enough to pass this to telegram_notify.


# app/watering.py

from app.util import telegram_notify

def water_plants(settings_profile_data, duration_seconds, is_adhoc_request):

    rain = u'\U00002614'

    # ..

        telegram_notify(
            rain + " - Scheduled watering has started at: " +
            str(datetime.datetime.now().strftime("%d-%m-%Y %H:%M")),
            settings_profile_data
        )

    # ..

        telegram_notify(
            rain + " - Scheduled watering has completed at: " +
            str(datetime.datetime.now().strftime("%d-%m-%Y %H:%M")),
            settings_profile_data
        )

    # ..


Watering Schedules



The majority of the logic covered above comes together when it's time to check schedules and actually water the plants.

Earlier when configuring Celery, a schedule was created that calls check_water_schedule every minute. This function performs a series of checks to determine whether or not to water at a given time.

First, it will cycle through all of the Settings model objects to find out if they're scheduled to run at the current time. If so, the next check is that today would be the correct date to water. This is because a user has several options in their settings profile to skip watering based on weather or recent watering occurances.

If everything lines up, then the water_plants task is called, with the duration of watering from the relevant settings_profile data.


# app/watering.py

import time
import datetime

from flask_socketio import SocketIO

from app import app, celery, db
from app.models import Settings, Watering
from app.notifications import telegram_notify
from app.weather import get_next_water_date


# Ran every minute
@celery.task
def check_water_schedule():

    # Compare schedules against the current time

    current_hour_minute = (
        datetime.datetime.now()
    ).strftime("%R")

    with app.app_context():

        settings_profiles = Settings.query.all()

    for settings_profile in settings_profiles:

        if settings_profile.watering_start_at == str(current_hour_minute):

            # If we're here we've got a matching time
            # Grab all of the settings_profile config
            settings_profile_data = {}

            for column in settings_profile.__table__.columns:
                settings_profile_data[column.name] = str(
                    getattr(
                        settings_profile,
                        column.name
                    )
                )

            # Now check whether its the correct date
            next_water_date = get_next_water_date(
                settings_profile_data
            )

            # If it's the correct date then we have a complete match
            if (next_water_date.strftime("%Y-%m-%d %k:%m") ==
                    datetime.datetime.now().strftime("%Y-%m-%d %k:%m")):

                # Obtain the watering duration
                duration_seconds = int(
                    settings_profile_data["water_duration_minutes"]
                ) * 60

                # Actually start watering
                water_plants.delay(
                    settings_profile_data["id"],
                    duration_seconds,
                    1
                )


When it comes to finding out if today would be the correct date for watering, based on the configuration of a given settings profile, there are more checks that must be done.

Users can configure their watering schedule by setting the following options to their preference (as seen in the Settings Profile UI screenshot):

  • The time of day watering should run
  • The duration in minutes that watering should occur for
  • Water everyday or water every other day
  • Skip watering if it's already rained today
  • Skip watering if it rained yesterday
  • Skip watering if watering has already occured today
  • Skip watering if watering occurred yesterday

These will first be set to defaults before trying to pull this information from the given settings profile.


# app/weather.py

from datetime import datetime, timedelta

# ..

def get_next_water_date(settings_profile_data):

    latest_rain_date = None
    if_rained_today = False
    if_rained_yesterday = False
    if_watered_today = False
    if_watered_yesterday = False
    eod_skip = False

    # Check if user has actually set their preferred watering time
    if (settings_profile_data["watering_start_at"] == "" or
            settings_profile_data["watering_start_at"] is None):

        return "Configuration Not Set"

    # Get the current datetime to manipulate to match the user settings
    current_utc_timestamp = convert_local_utc(
        datetime.now().timestamp(),
        settings_profile_data
    )

    current_datetime = datetime.fromtimestamp(current_utc_timestamp)

    # Modify the datetimes hour/minute values to match user settings
    modified_datetime = current_datetime.replace(
        hour=int(settings_profile_data["watering_start_at"].split(":")[0]),
        minute=int(settings_profile_data["watering_start_at"].split(":")[1]),
        second=0,
        microsecond=0
    )

    # Set the seconds to 0 as we only care about the hour/minute for comparison
    current_datetime = current_datetime.replace(
        second=0,
        microsecond=0
    )

    # If the hour/minute setting has already passed today, set day to tomorrow
    if modified_datetime >= current_datetime:

        next_water_date = modified_datetime
    else:

        next_water_date = modified_datetime + timedelta(days=1)

    # Store the last saved water time
    last_water_date, last_water_duration = get_last_water_date()

    if last_water_date != "never":

        # boolean for whether last watered date was today
        if last_water_date.date() == current_datetime.date():

            if_watered_today = True
            eod_skip = True

        # boolean for whether last watered date was yesterday
        if last_water_date.date() == (
            current_datetime.date() - timedelta(days=1)
        ):

            if_watered_yesterday = True
            eod_skip = True

    # If user settings are impacted by rainfall:
    #   - store upcoming rain
    #   - store if it rained yesterday
    #   - store if it rained today
    if (settings_profile_data["skip_rained_today"] == "on" or
            settings_profile_data["skip_rained_yesterday"] == "on"):

        upcoming_weather = get_one_call_current(
            settings_profile_data
        )

        for hourly_weather in reversed(upcoming_weather):

            if hourly_weather.status.lower() == "rain":

                latest_rain_date = datetime.fromtimestamp(
                    convert_utc_local(
                        hourly_weather.ref_time,
                        settings_profile_data
                    )
                )
                break

        if latest_rain_date is None:

            latest_rain_date = "unknown"

        # This will only check todays weather up to the current time,
        # not until the end of the day.
        todays_weather_so_far = get_one_call_history(
            0,
            settings_profile_data
        )

        # Check and store yesterdays weather
        # so we can check for rain
        yesterdays_weather = get_one_call_history(
            1,
            settings_profile_data
        )

        # Check if it rained today, store the boolean
        for todays_weather_statuses in todays_weather_so_far:

            if todays_weather_statuses.status.lower() == "rain":

                if_rained_today = True
                eod_skip = True
                break

        # If it hasn't rained today so far, it still might later
        # Check the upcoming weather to find out
        if if_rained_today is False:

            forecast_rain = get_next_rain_date(
                settings_profile_data
            )

            # No rain today
            if forecast_rain == "over a week away":

                if_rained_today = False
                eod_skip = True

            # It will rain today
            elif forecast_rain.date() <= current_datetime.date():

                if_rained_today = True
                eod_skip = True

        # Did it rain yesterday?
        for weather_statuses in yesterdays_weather:

            if weather_statuses.status.lower() == "rain":

                if_rained_yesterday = True
                eod_skip = True
                break

    # If there's no historical watering and the user settings are not
    # impacted by rainfall, then water at next time occurance
    if (last_water_date == "never" and
            ((settings_profile_data["skip_rained_today"] != "on" and
                settings_profile_data["skip_rained_yesterday"] != "on") or
                latest_rain_date == "unknown")):

        return next_water_date

    else:

        # Based on the rain data above, and all of the
        # user settings combinations, find out if 
        # we need to water or not.

        if (settings_profile_data["skip_watered_today"] == "on"
                and if_watered_today is True):

            if next_water_date.date() <= current_datetime.date():

                next_water_date = next_water_date + timedelta(days=1)

        if (settings_profile_data["skip_watered_yesterday"] == "on"
                and if_watered_yesterday is True):

            if next_water_date.date() <= current_datetime.date():
                next_water_date = next_water_date + timedelta(days=1)

        if (settings_profile_data["skip_rained_today"] == "on"
                and if_rained_today is True):

            if next_water_date.date() <= current_datetime.date():
                next_water_date = next_water_date + timedelta(days=1)

            if next_water_date.date() == latest_rain_date.date():
                next_water_date = next_water_date + timedelta(days=1)

        if (settings_profile_data["skip_rained_yesterday"] == "on"
                and if_rained_yesterday is True):

            if next_water_date.date() <= current_datetime.date():
                next_water_date = next_water_date + timedelta(days=1)

            if (next_water_date.date() ==
                    (latest_rain_date + timedelta(days=1)).date()):

                next_water_date = next_water_date + timedelta(days=1)

        # Based on all of the above, add a day if the user
        # wants to water every other day.
        if (settings_profile_data["schedule_watering"] == "eod"
                and eod_skip is True):

            next_water_date = next_water_date + timedelta(days=1)

        return next_water_date



Watering



Finally, the function below is responsible for the watering by opening and closing the solenoid valve for a set amount of time. This function either gets called ad-hoc from the Dashboard by a user, or from the check_water_schedule task covered above.

The application can be optionally run in a demo mode which won't attempt to connect to GPIO pins. This is just useful for debugging or testing changes on local environments.

This section also handles the progress bar seen in the screenshots on the Dashboards using SocketIO again, however - I'll cover this in a separate post.


# app/watering.py

# ..

@celery.task(
    base=celery.QueueOnce,
    once={'timeout': 60 * 60}
)
def water_plants(profile_id, duration_seconds, is_adhoc_request):


    settings_profile_data = Settings.query.filter_by(
        id=profile_id
    ).first()

    rain = u'\U00002614'

    socketio = SocketIO(message_queue=app.config['CELERY_BROKER_URL'])

    start_time = datetime.datetime.now()

    # Open the valve and send notification

    if app.config['DEMO_MODE'] == "False":

        import RPi.GPIO as GPIO

        telegram_notify(
            rain + " - Scheduled watering has started at: " +
            str(datetime.datetime.now().strftime("%d-%m-%Y %H:%M")),
            settings_profile_data
        )

        GPIO.setmode(GPIO.BCM)
        GPIO_PIN = 21

        GPIO.setup(GPIO_PIN, GPIO.OUT)
        GPIO.output(GPIO_PIN, False)

        GPIO.output(GPIO_PIN, True)

    else:

        app.logger.info(
            "DEMO MODE ENABLED for Settings Profile: " +
            settings_profile_data.name +
            " has started watering."
        )

    # Send progress updates every second for the
    # duration of the desired watering

    for i in range(0, duration_seconds, 1):

        if i < duration_seconds:

            message = "Watering in progress..."

        socketio.emit("short_response", {
            "current": i,
            "total": duration_seconds,
            "status": message
        }, namespace="/water")

        time.sleep(1)

    # Store this event in the database

    new_water_event = Watering(
        water_start_time=start_time,
        water_end_time=datetime.datetime.now(),
        adhoc_request=is_adhoc_request,
        status="completed",
        water_duration_minutes=int((duration_seconds / 60))
    )

    db.session.add(new_water_event)
    db.session.commit()

    # Close the valve and send notification

    if app.config['DEMO_MODE'] == "False":

        GPIO.output(GPIO_PIN, False)
        telegram_notify(
            rain + " - Scheduled watering has completed at: " +
            str(datetime.datetime.now().strftime("%d-%m-%Y %H:%M")),
            settings_profile_data
        )
        GPIO.cleanup()

    else:

        app.logger.info(
            "DEMO MODE ENABLED for Settings Profile: " +
            settings_profile_data.name +
            " has completed watering."
        )

    # End the progress bar

    socketio.emit('short_response', {
        "current": 100,
        "total": 100,
        "status": "Done"
        }, namespace='/water')


Watering

Next up



The next part goes over the infrastructure that holds this project together: Part 4 - Infrastructure (coming soon).


Leave a Comment

Comments (0)