Open
Description
Specifications
Client Version: 1.37.0
InfluxDB v2.6.1
Server: 9dcf880
Frontend: 5ba8e15
Code sample to reproduce problem
When writing a pandas dataframe to the InfluxDB server using below write function:
class SetupInflux:
def __init__(self, influx_url, token, org_id, influx_bucket, res, debug=False, verbose=True):
from influxdb_client import InfluxDBClient
self.influx_url = influx_url
self.token = token
self.org_id = org_id
self.influx_bucket = influx_bucket
self.debug = debug
self.verbose = verbose
self.res = res
self.client = InfluxDBClient(url=self.influx_url, token=self.token, org=self.org_id, debug=False)
self.test = self.test_influx()
return
def __del__(self):
self.client.__del__()
def get_start_times(self, devices, default_start, dynamic):
"""Get latest InfluxDB timestamps for devices for use as 'start times' for listing log files from S3"""
from datetime import datetime, timedelta
from dateutil.tz import tzutc
default_start_dt = datetime.strptime(default_start, "%Y-%m-%d %H:%M:%S").replace(tzinfo=tzutc())
device_ids = [device.split("/")[1] for device in devices]
start_times = []
if dynamic == False or self.test == 0:
for device in device_ids:
last_time = default_start_dt
start_times.append(last_time)
elif self.test != 0:
for device in device_ids:
influx_time = self.client.query_api().query(
f'from(bucket:"{self.influx_bucket}") |> range(start: -100d) |> filter(fn: (r) => r["_measurement"] == "{device}") |> group() |> last()'
)
if len(influx_time) == 0:
last_time = default_start_dt
else:
last_time = influx_time[0].records[0]["_time"]
last_time = last_time + timedelta(seconds=2)
start_times.append(last_time)
if self.verbose:
print(f"Log files will be fetched for {device} from {last_time}")
return start_times
def add_signal_tags(self, df_signal):
"""Advanced: This can be used to add custom tags to the signals
based on a specific use case logic. In effect, this will
split the signal into multiple timeseries
"""
tag_columns = ["tag"]
def event_test(row):
return "event" if row[0] > 1200 else "no event"
for tag in tag_columns:
df_signal[tag] = df_signal.apply(lambda row: event_test(row), axis=1)
return tag_columns, df_signal
def write_signals(self, device_id, df_phys):
"""Given a device ID and a dataframe of physical values,
resample and write each signal to a time series database
:param device_id: ID of device (used as the 'measurement name')
:param df_phys: Dataframe of physical values (e.g. as per output of can_decoder)
"""
tag_columns = []
if df_phys.empty:
print("Warning: Dataframe is empty, no data written")
return
else:
if self.res != "":
self.write_influx(device_id, df_phys, [])
else:
for signal, group in df_phys.groupby("Signal")["Physical Value"]:
df_signal = group.to_frame().rename(columns={"Physical Value": signal})
if self.res != "":
df_signal = df_signal.resample(self.res).ffill().dropna()
if self.verbose:
print(f"Signal: {signal} (mean: {round(df_signal[signal].mean(),2)} | records: {len(df_signal)} | resampling: {self.res})")
# tag_columns, df_signal = self.add_signal_tags(df_signal)
self.write_influx(device_id, df_signal, tag_columns)
def write_influx(self, name, df, tag_columns):
"""Helper function to write signal dataframes to InfluxDB"""
from influxdb_client import WriteOptions
if self.test == 0:
print("Please check your InfluxDB credentials")
return
with self.client.write_api(
write_options=WriteOptions(
batch_size=20_000,
flush_interval=1_000,
jitter_interval=0,
retry_interval=5_000,
)
) as _write_client:
_write_client.write(self.influx_bucket, record=df, data_frame_measurement_name=name,
data_frame_tag_columns=tag_columns)
if self.verbose:
print(f"- SUCCESS: {len(df.index)} records of {name} written to InfluxDB\n\n")
if self.test != 0:
_write_client.__del__()
Expected behavior
I expect that the data is simply written without exceptions or errors. It works as intended in terms of writing the data.
Actual behavior
When the script has run, I get below error:
Exception ignored in: <function InfluxDBClient.__del__ at 0x00000232E7BBC430>
Traceback (most recent call last):
File "C:\Users\marti\AppData\Local\Programs\Python\Python39\lib\site-packages\influxdb_client\client\influxdb_client.py", line 320, in __del__
File "C:\Users\marti\AppData\Local\Programs\Python\Python39\lib\site-packages\influxdb_client\_sync\api_client.py", line 84, in __del__
File "C:\Users\marti\AppData\Local\Programs\Python\Python39\lib\site-packages\influxdb_client\_sync\api_client.py", line 661, in _signout
TypeError: 'NoneType' object is not callable
Exception ignored in: <function ApiClient.__del__ at 0x00000232E7BFE700>
Traceback (most recent call last):
File "C:\Users\marti\AppData\Local\Programs\Python\Python39\lib\site-packages\influxdb_client\_sync\api_client.py", line 84, in __del__
File "C:\Users\marti\AppData\Local\Programs\Python\Python39\lib\site-packages\influxdb_client\_sync\api_client.py", line 661, in _signout
TypeError: 'NoneType' object is not callable
Additional info
No response
Metadata
Metadata
Assignees
Labels
Further information is requestedFurther information is requested