Code Python fichier test.PY
from wallbox import Wallbox, Statuses
import time
import datetime
w = Wallbox("user@email", "password")
# Authenticate with the credentials above
w.authenticate()
# Print a list of chargers in the account
print(w.getChargersList())
# Get charger data for all chargers in the list, then lock and unlock chargers
for chargerId in w.getChargersList():
chargerStatus = w.getChargerStatus(chargerId)
print(f"Charger Status: {chargerStatus}")
print(f"Lock Charger {chargerId}")
endDate = datetime.datetime.now()
startDate = endDate - datetime.timedelta(days = 30)
sessionList = w.getSessionList(chargerId, startDate, endDate)
print(f"Session List: {sessionList}")
w.lockCharger(chargerId)
time.sleep(10)
chargerStatus = w.getChargerStatus(chargerId)
print(f"Charger {chargerId} lock status {chargerStatus['config_data']['locked']}")
print(f"Unlock Charger {chargerId}")
w.unlockCharger(chargerId)
time.sleep(10)
chargerStatus = w.getChargerStatus(chargerId)
print(f"Charger {chargerId} lock status {chargerStatus['config_data']['locked']}")
# Print the status the charger is currently in using the status id
print(f"Charger Mode: {Statuses(chargerStatus['status_id']).name}")
Code python du fichier wallbox.py
"""
Wallbox class
"""
from datetime import datetime
from time import timezone
from requests.auth import HTTPBasicAuth
import requests
import json
class Wallbox:
def __init__(self, username, password, requestGetTimeout = None):
self.username = username
self.password = password
self._requestGetTimeout = requestGetTimeout
self.baseUrl = "https://api.wall-box.com/"
self.authUrl = "https://user-api.wall-box.com/"
self.jwtToken = ""
self.jwtTokenTtl = 0
self.headers = {
"Accept": "application/json",
"Content-Type": "application/json;charset=UTF-8",
}
@property
def requestGetTimeout(self):
return self._requestGetTimeout
def authenticate(self):
if self.jwtToken != "" and self.jwtTokenTtl > datetime.timestamp(datetime.now()):
return
try:
response = requests.get(
f"{self.authUrl}users/signin",
auth=HTTPBasicAuth(self.username, self.password),
headers={'Partner': 'wallbox'},
timeout=self._requestGetTimeout
)
response.raise_for_status()
except requests.exceptions.HTTPError as err:
raise (err)
self.jwtToken = json.loads(response.text)["data"]["attributes"]["token"]
self.jwtTokenTtl = json.loads(response.text)["data"]["attributes"]["ttl"]
self.headers["Authorization"] = f"Bearer {self.jwtToken}"
def getChargersList(self):
chargerIds = []
try:
response = requests.get(
f"{self.baseUrl}v3/chargers/groups", headers=self.headers,
timeout=self._requestGetTimeout
)
response.raise_for_status()
except requests.exceptions.HTTPError as err:
raise (err)
for group in json.loads(response.text)["result"]["groups"]:
for charger in group["chargers"]:
chargerIds.append(charger["id"])
return chargerIds
def unlockCharger(self, chargerId):
try:
response = requests.put(
f"{self.baseUrl}v2/charger/{chargerId}",
headers=self.headers,
data='{"locked":0}',
)
response.raise_for_status()
except requests.exceptions.HTTPError as err:
raise (err)
return json.loads(response.text)
def lockCharger(self, chargerId):
try:
response = requests.put(
f"{self.baseUrl}v2/charger/{chargerId}",
headers=self.headers,
data='{"locked":1}',
)
response.raise_for_status()
except requests.exceptions.HTTPError as err:
raise (err)
return json.loads(response.text)
def setMaxChargingCurrent(self, chargerId, newMaxChargingCurrentValue):
try:
response = requests.put(
f"{self.baseUrl}v2/charger/{chargerId}",
headers=self.headers,
data=f'{{ "maxChargingCurrent":{newMaxChargingCurrentValue}}}',
)
response.raise_for_status()
except requests.exceptions.HTTPError as err:
raise (err)
return json.loads(response.text)
def pauseChargingSession(self, chargerId):
try:
response = requests.post(
f"{self.baseUrl}v3/chargers/{chargerId}/remote-action",
headers=self.headers,
data='{"action":2}',
)
response.raise_for_status()
except requests.exceptions.HTTPError as err:
raise (err)
return json.loads(response.text)
def resumeChargingSession(self, chargerId):
try:
response = requests.post(
f"{self.baseUrl}v3/chargers/{chargerId}/remote-action",
headers=self.headers,
data='{"action":1}',
)
response.raise_for_status()
except requests.exceptions.HTTPError as err:
raise (err)
return json.loads(response.text)
def getSessionList(self, chargerId, startDate, endDate):
try:
payload = {'charger': chargerId, 'start_date': startDate.timestamp(), 'end_date': endDate.timestamp() }
response = requests.get(
f"{self.baseUrl}v4/sessions/stats", params=payload, headers=self.headers,
timeout=self._requestGetTimeout
)
response.raise_for_status()
except requests.exceptions.HTTPError as err:
raise (err)
return json.loads(response.text)
Code Python du fichier statuses.py
from aenum import MultiValueEnum
class Statuses(MultiValueEnum):
WAITING = 164, 180, 181, 183, 184, 185, 186, 187, 188, 189,
CHARGING = 193, 194, 195,
READY = 161, 162,
PAUSED = 178, 182,
SCHEDULED = 177, 179,
DISCHARGING = 196,
ERROR = 14, 15,
DISCONNECTED = 0, 163, None,
LOCKED = 209, 210, 165,
UPDATING = 166
Code du fichier init.py
# Wallbox EV module __init__.py
from wallbox.wallbox import Wallbox
from wallbox.statuses import Statuses