Pluging Wallbox

Code Python fichier test.PY

from wallbox import Wallbox, Statuses
import time
import datetime

w = Wallbox("user@email", "password")

# Authenticate with the credentials above
w.authenticate()

# Print a list of chargers in the account
print(w.getChargersList())

# Get charger data for all chargers in the list, then lock and unlock chargers
for chargerId in w.getChargersList():
    chargerStatus = w.getChargerStatus(chargerId)
    print(f"Charger Status: {chargerStatus}")
    print(f"Lock Charger {chargerId}")
    endDate = datetime.datetime.now()
    startDate = endDate - datetime.timedelta(days = 30)
    sessionList = w.getSessionList(chargerId, startDate, endDate)
    print(f"Session List: {sessionList}")
    w.lockCharger(chargerId)
    time.sleep(10)
    chargerStatus = w.getChargerStatus(chargerId)
    print(f"Charger {chargerId} lock status {chargerStatus['config_data']['locked']}")
    print(f"Unlock Charger {chargerId}")
    w.unlockCharger(chargerId)
    time.sleep(10)
    chargerStatus = w.getChargerStatus(chargerId)
    print(f"Charger {chargerId} lock status {chargerStatus['config_data']['locked']}")

    # Print the status the charger is currently in using the status id
    print(f"Charger Mode: {Statuses(chargerStatus['status_id']).name}")

Code python du fichier wallbox.py

"""

Wallbox class

"""

from datetime import datetime
from time import timezone
from requests.auth import HTTPBasicAuth
import requests
import json


class Wallbox:
    def __init__(self, username, password, requestGetTimeout = None):
        self.username = username
        self.password = password
        self._requestGetTimeout = requestGetTimeout
        self.baseUrl = "https://api.wall-box.com/"
        self.authUrl = "https://user-api.wall-box.com/"
        self.jwtToken = ""
        self.jwtTokenTtl = 0
        self.headers = {
            "Accept": "application/json",
            "Content-Type": "application/json;charset=UTF-8",
        }

    @property
    def requestGetTimeout(self):
        return self._requestGetTimeout

    def authenticate(self):
        if self.jwtToken != "" and self.jwtTokenTtl > datetime.timestamp(datetime.now()):
            return

        try:
            response = requests.get(
                f"{self.authUrl}users/signin",
                auth=HTTPBasicAuth(self.username, self.password),
                headers={'Partner': 'wallbox'},
                timeout=self._requestGetTimeout
            )
            response.raise_for_status()
        except requests.exceptions.HTTPError as err:
            raise (err)
        self.jwtToken = json.loads(response.text)["data"]["attributes"]["token"]
        self.jwtTokenTtl = json.loads(response.text)["data"]["attributes"]["ttl"]
        self.headers["Authorization"] = f"Bearer {self.jwtToken}"

    def getChargersList(self):
        chargerIds = []
        try:
            response = requests.get(
                f"{self.baseUrl}v3/chargers/groups", headers=self.headers,
                timeout=self._requestGetTimeout
            )
            response.raise_for_status()
        except requests.exceptions.HTTPError as err:
            raise (err)
        for group in json.loads(response.text)["result"]["groups"]:
            for charger in group["chargers"]:
                chargerIds.append(charger["id"])
        return chargerIds

     

    def unlockCharger(self, chargerId):
        try:
            response = requests.put(
                f"{self.baseUrl}v2/charger/{chargerId}",
                headers=self.headers,
                data='{"locked":0}',
            )
            response.raise_for_status()
        except requests.exceptions.HTTPError as err:
            raise (err)
        return json.loads(response.text)

    def lockCharger(self, chargerId):
        try:
            response = requests.put(
                f"{self.baseUrl}v2/charger/{chargerId}",
                headers=self.headers,
                data='{"locked":1}',
            )
            response.raise_for_status()
        except requests.exceptions.HTTPError as err:
            raise (err)
        return json.loads(response.text)

    def setMaxChargingCurrent(self, chargerId, newMaxChargingCurrentValue):
        try:
            response = requests.put(
                f"{self.baseUrl}v2/charger/{chargerId}",
                headers=self.headers,
                data=f'{{ "maxChargingCurrent":{newMaxChargingCurrentValue}}}',
            )
            response.raise_for_status()
        except requests.exceptions.HTTPError as err:
            raise (err)
        return json.loads(response.text)

    def pauseChargingSession(self, chargerId):
        try:
            response = requests.post(
                f"{self.baseUrl}v3/chargers/{chargerId}/remote-action",
                headers=self.headers,
                data='{"action":2}',
            )
            response.raise_for_status()
        except requests.exceptions.HTTPError as err:
            raise (err)
        return json.loads(response.text)

    def resumeChargingSession(self, chargerId):
        try:
            response = requests.post(
                f"{self.baseUrl}v3/chargers/{chargerId}/remote-action",
                headers=self.headers,
                data='{"action":1}',
            )
            response.raise_for_status()
        except requests.exceptions.HTTPError as err:
            raise (err)
        return json.loads(response.text)

    def getSessionList(self, chargerId, startDate, endDate):
        try:
            payload = {'charger': chargerId, 'start_date': startDate.timestamp(), 'end_date': endDate.timestamp() }

            response = requests.get(
                f"{self.baseUrl}v4/sessions/stats", params=payload, headers=self.headers,
                timeout=self._requestGetTimeout
            )
            response.raise_for_status()
        except requests.exceptions.HTTPError as err:
            raise (err)
        return json.loads(response.text)

Code Python du fichier statuses.py

from aenum import MultiValueEnum


class Statuses(MultiValueEnum):
    WAITING = 164, 180, 181, 183, 184, 185, 186, 187, 188, 189,
    CHARGING = 193, 194, 195,
    READY = 161, 162,
    PAUSED = 178, 182,
    SCHEDULED = 177, 179,
    DISCHARGING = 196,
    ERROR = 14, 15,
    DISCONNECTED = 0, 163, None,
    LOCKED = 209, 210, 165,
    UPDATING = 166

Code du fichier init.py

# Wallbox EV module __init__.py
from wallbox.wallbox import Wallbox
from wallbox.statuses import Statuses