Skip to content

Commit

Permalink
adjust log levels (more info, less debug), rename LOG objects
Browse files Browse the repository at this point in the history
  • Loading branch information
knudmoeller committed Feb 1, 2024
1 parent 5ed718c commit bd26b66
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 56 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## Development

- Remove travis-requirements.txt.
- Adjust log-levels.

## [1.4.2](https://github.com/berlinonline/ckanext-fisbroker/releases/tag/1.4.2)

Expand Down
18 changes: 9 additions & 9 deletions ckanext/fisbroker/csw_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import ckanext.spatial.lib.csw_client as csw_client
from ckanext.spatial.harvesters.base import text_traceback

log = logging.getLogger(__name__)
LOG = logging.getLogger(__name__)

class CswService(csw_client.CswService):
"""
Expand Down Expand Up @@ -61,8 +61,8 @@ def getidentifiers(self, qtype=None, typenames="csw:Record", esn="brief",
while True:
# repeat the request up to [retries] times
for attempt in range(1, retries + 1):
log.info(f"Making CSW request: getrecords2 {kwa}")
log.info(f"Attempt #{attempt} of {retries}")
LOG.info(f"Making CSW request: getrecords2 {kwa}")
LOG.info(f"Attempt #{attempt} of {retries}")

try:
csw.getrecords2(**kwa)
Expand All @@ -77,8 +77,8 @@ def getidentifiers(self, qtype=None, typenames="csw:Record", esn="brief",
except Exception as e2:
err = f"Error getting identifiers, text_traceback() failed ({e2})"
if attempt < retries:
log.info(err)
log.info(f"waiting {wait_time} seconds...")
LOG.info(err)
LOG.info(f"waiting {wait_time} seconds...")
sleep(wait_time)
continue
else:
Expand Down Expand Up @@ -114,8 +114,8 @@ def getrecordbyid(self, ids=[], esn="full", outputschema="gmd", retries=3, wait_
"outputschema": namespaces[outputschema],
}
for attempt in range(1, retries + 1):
log.info(f"Making CSW request: getrecordbyid {ids} {kwa}")
log.info(f"Attempt #{attempt} of {retries}")
LOG.info(f"Making CSW request: getrecordbyid {ids} {kwa}")
LOG.info(f"Attempt #{attempt} of {retries}")

try:
csw.getrecordbyid(ids, **kwa)
Expand All @@ -130,8 +130,8 @@ def getrecordbyid(self, ids=[], esn="full", outputschema="gmd", retries=3, wait_
except Exception as e2:
err = f"Error getting record by id, text_traceback() failed ({e})"
if attempt < retries:
log.info(err)
log.info(f"waiting {wait_time} seconds...")
LOG.info(err)
LOG.info(f"waiting {wait_time} seconds...")
sleep(wait_time)
continue
else:
Expand Down
82 changes: 41 additions & 41 deletions ckanext/fisbroker/fisbroker_harvester.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ def get_import_since_date(self, harvest_job):
import_since = self.source_config['import_since']
if import_since == 'last_error_free':
last_error_free_job = self.last_error_free_job(harvest_job)
LOG.debug('Last error-free job: %r', last_error_free_job)
LOG.info('Last error-free job: %r', last_error_free_job)
if last_error_free_job:
gather_time = (last_error_free_job.gather_started +
timedelta(hours=self.get_timedelta()))
Expand Down Expand Up @@ -374,8 +374,8 @@ def validate_config(self, config):
return CSWHarvester.validate_config(self, config)

def gather_stage(self, harvest_job):
log = logging.getLogger(__name__ + '.CSW.gather')
log.debug(f"FisbrokerPlugin gather_stage for job: {harvest_job}")
LOG = logging.getLogger(__name__ + '.CSW.gather')
LOG.info(f"FisbrokerPlugin gather_stage for job: {harvest_job}")
# Get source URL
url = harvest_job.source.url

Expand All @@ -384,7 +384,7 @@ def gather_stage(self, harvest_job):
try:
self._setup_csw_client(url)
except Exception as e:
log.debug(f"Error contacting the CSW server: {e}")
LOG.error(f"Error contacting the CSW server: {e}")
self._save_gather_error(f"Error contacting the CSW server: {e}", harvest_job)
return None

Expand All @@ -406,9 +406,9 @@ def get_identifiers(constraints=[]):
for identifier in self.csw.getidentifiers(page=10, outputschema=self.output_schema(),
cql=cql, constraints=constraints):
try:
log.info(f"Got identifier {identifier} from the CSW")
LOG.info(f"Got identifier {identifier} from the CSW")
if identifier is None:
log.error(f"CSW returned identifier {identifier}, skipping...")
LOG.error(f"CSW returned identifier {identifier}, skipping...")
continue

guid_set.add(identifier)
Expand All @@ -421,23 +421,23 @@ def get_identifiers(constraints=[]):
# first get the (date)constrained set of identifiers,
# to figure what was added and/or changed
# only those identifiers will be fetched
log.debug(f"Starting gathering for {url} (constrained)")
LOG.info(f"Starting gathering for {url} (constrained)")

try:
constraints = self.get_constraints(harvest_job)
guids_in_harvest_constrained = get_identifiers(constraints)

# then get the complete set of identifiers, to figure out
# what was deleted
log.debug(f"Starting gathering for {url} (unconstrained)")
LOG.info(f"Starting gathering for {url} (unconstrained)")
guids_in_harvest_complete = set()
if (constraints == []):
log.debug("There were no constraints, so GUIDS(unconstrained) == GUIDs(constrained)")
LOG.info("There were no constraints, so GUIDS(unconstrained) == GUIDs(constrained)")
guids_in_harvest_complete = guids_in_harvest_constrained
else:
guids_in_harvest_complete = get_identifiers()
except Exception as e:
log.error(f"Exception: {text_traceback()}")
LOG.error(f"Exception: {text_traceback()}")
self._save_gather_error(
f"Error gathering the identifiers from the CSW server [{str(e)}]", harvest_job)
return None
Expand All @@ -454,9 +454,9 @@ def get_identifiers(constraints=[]):
# (constrained) harvest
change = guids_in_db & guids_in_harvest_constrained

log.debug(f"|new GUIDs|: {len(new)}")
log.debug(f"|deleted GUIDs|: {len(delete)}")
log.debug(f"|changed GUIDs|: {len(change)}")
LOG.info(f"|new GUIDs|: {len(new)}")
LOG.info(f"|deleted GUIDs|: {len(delete)}")
LOG.info(f"|changed GUIDs|: {len(change)}")

ids = []
for guid in new:
Expand Down Expand Up @@ -495,13 +495,13 @@ def fetch_stage(self,harvest_object, retries=3, wait_time=5.0):
# No need to fetch anything, just pass to the import stage
return True

log = logging.getLogger(__name__ + '.CSW.fetch')
log.debug(f"CswHarvester fetch_stage for object: {harvest_object.id}")
LOG = logging.getLogger(__name__ + '.CSW.fetch')
LOG.info(f"CswHarvester fetch_stage for object: {harvest_object.id}")

url = harvest_object.source.url
for attempt in range(1, retries + 1):
try:
log.info(f"Setting up CSW client: Attempt #{attempt} of {retries}")
LOG.info(f"Setting up CSW client: Attempt #{attempt} of {retries}")
self._setup_csw_client(url)
break
except Exception as e:
Expand All @@ -510,10 +510,10 @@ def fetch_stage(self,harvest_object, retries=3, wait_time=5.0):
except Exception as e2:
err = f"Error setting up CSW client, text_traceback() failed ({e2})"
if attempt < retries:
log.info(err)
log.info(f"waiting {wait_time} seconds...")
LOG.info(err)
LOG.info(f"waiting {wait_time} seconds...")
sleep(wait_time)
log.info(f"Repeating request! (attempt #{(attempt + 1)})")
LOG.info(f"Repeating request! (attempt #{(attempt + 1)})")
continue
else:
self._save_object_error(f"Error setting up CSW client: {e}",
Expand Down Expand Up @@ -543,7 +543,7 @@ def fetch_stage(self,harvest_object, retries=3, wait_time=5.0):
self._save_object_error(f"Error saving the harvest object for GUID {identifier} [{e}]", harvest_object)
return False

log.debug(f"XML content saved (len {len(record['xml'])})")
LOG.info(f"XML content saved (len {len(record['xml'])})")
return True

def import_stage(self, harvest_object):
Expand All @@ -553,11 +553,11 @@ def import_stage(self, harvest_object):
'user': self._get_user_name(),
}

log = logging.getLogger(__name__ + '.import')
log.debug(f"Import stage for harvest object: {harvest_object.id}")
LOG = logging.getLogger(__name__ + '.import')
LOG.info(f"Import stage for harvest object: {harvest_object.id}")

if not harvest_object:
log.error("No harvest object received")
LOG.error("No harvest object received")
return False

self._set_source_config(harvest_object.source.config)
Expand All @@ -579,7 +579,7 @@ def import_stage(self, harvest_object):
'ignore_auth': True,
})
toolkit.get_action('package_delete')(context, {'id': harvest_object.package_id})
log.info(f"Deleted package {harvest_object.package_id} with guid {harvest_object.guid}")
LOG.info(f"Deleted package {harvest_object.package_id} with guid {harvest_object.guid}")

return True

Expand All @@ -591,7 +591,7 @@ def import_stage(self, harvest_object):
self.__base_transform_to_iso_called = False
content = self.transform_to_iso(original_document, original_format, harvest_object)
if not self.__base_transform_to_iso_called:
log.warn("Deprecation warning: calling transform_to_iso directly is deprecated. " +
LOG.warn("Deprecation warning: calling transform_to_iso directly is deprecated. " +
"Please use the ISpatialHarvester interface method instead.")

for harvester in plugins.PluginImplementations(ISpatialHarvester):
Expand Down Expand Up @@ -674,11 +674,11 @@ def import_stage(self, harvest_object):
'harvest_object': harvest_object,
})
if not package_dict:
log.error(f"No package dict returned, aborting import for object {harvest_object.id}")
LOG.error(f"No package dict returned, aborting import for object {harvest_object.id}")
return False

if package_dict == 'skip':
log.info(f"Skipping import for object {harvest_object.id}")
LOG.info(f"Skipping import for object {harvest_object.id}")
return 'unchanged'

# Create / update the package
Expand Down Expand Up @@ -711,7 +711,7 @@ def import_stage(self, harvest_object):
# So check if package_dict['name'] already exists. If so, change status to
# 'change'.
if status == 'new' and package:
log.info(f"Resource with guid {harvest_object.guid} looks new, but there is a package with the same name: '{package_name}'. Changing that package instead of creating a new one.")
LOG.info(f"Resource with guid {harvest_object.guid} looks new, but there is a package with the same name: '{package_name}'. Changing that package instead of creating a new one.")
status = 'change'
harvest_object.package_id = package.as_dict()['id']
harvest_object.add()
Expand All @@ -728,7 +728,7 @@ def import_stage(self, harvest_object):
# we need to create a new one.
if status == 'change' and not package:
# apparently the package was purged, a new one has to be created
log.info(f"There is no package named '{package_name}' for guid {harvest_object.guid}, creating a new one.")
LOG.info(f"There is no package named '{package_name}' for guid {harvest_object.guid}, creating a new one.")
status = 'new'


Expand All @@ -753,7 +753,7 @@ def import_stage(self, harvest_object):

try:
package_id = toolkit.get_action('package_create')(context, package_dict)
log.info('Created new package %s with guid %s', package_id, harvest_object.guid)
LOG.info('Created new package %s with guid %s', package_id, harvest_object.guid)
except toolkit.ValidationError as e:
self._save_object_error('Validation Error: %s' % six.text_type(e.error_summary), harvest_object, 'Import')
return False
Expand All @@ -763,7 +763,7 @@ def import_stage(self, harvest_object):
# if the package was deleted, make it active again (state in FIS-Broker takes
# precedence)
if package.state == "deleted":
log.info(f"The package named {package_dict['name']} was deleted, activating it again.")
LOG.info(f"The package named {package_dict['name']} was deleted, activating it again.")
package.state = "active"

# Check if the modified date is more recent
Expand Down Expand Up @@ -796,7 +796,7 @@ def import_stage(self, harvest_object):
package_index = PackageSearchIndex()
package_index.index_package(package_dict)

log.info(f"Document with GUID {harvest_object.guid} unchanged, skipping...")
LOG.info(f"Document with GUID {harvest_object.guid} unchanged, skipping...")
else:
package_schema = logic.schema.default_update_package_schema()
package_schema['tags'] = tag_schema
Expand All @@ -805,7 +805,7 @@ def import_stage(self, harvest_object):
package_dict['id'] = harvest_object.package_id
try:
package_id = toolkit.get_action('package_update')(context, package_dict)
log.info(f"Updated package {package_id} with guid {harvest_object.guid}")
LOG.info(f"Updated package {package_id} with guid {harvest_object.guid}")
except toolkit.ValidationError as e:
self._save_object_error(f"Validation Error: {six.text_type(e.error_summary)}", harvest_object, 'Import')
return False
Expand All @@ -824,7 +824,7 @@ def get_validators(self):
'''Implementation of ckanext.spatial.interfaces.ISpatialHarvester.get_validators().
https://github.com/ckan/ckanext-spatial/blob/master/ckanext/spatial/interfaces.py
'''
LOG.debug("--------- get_validators ----------")
LOG.info("--------- get_validators ----------")
return [AlwaysValid]

def get_package_dict(self, context, data_dict):
Expand All @@ -840,7 +840,7 @@ def remember_error(harvest_object, error_dict):
if harvest_object:
harvest_object.extras.append(HarvestObjectExtra(key='error',value=json.dumps(error_dict)))

LOG.debug("--------- get_package_dict ----------")
LOG.info("--------- get_package_dict ----------")

if hasattr(data_dict, '__getitem__'):

Expand All @@ -852,17 +852,17 @@ def remember_error(harvest_object, error_dict):

# checking if marked for Open Data
if not marked_as_opendata(data_dict):
LOG.debug("no 'opendata' tag, skipping dataset ...")
LOG.info("no 'opendata' tag, skipping dataset ...")
remember_error(harvest_object, {'code': 1, 'description': 'not tagged as open data'})
return 'skip'
LOG.debug("this is tagged 'opendata', continuing ...")
LOG.info("this is tagged 'opendata', continuing ...")

# we're only interested in service resources
if not marked_as_service_resource(data_dict):
LOG.debug("this is not a service resource, skipping dataset ...")
LOG.info("this is not a service resource, skipping dataset ...")
remember_error(harvest_object, {'code': 2, 'description': 'not a service resource'})
return 'skip'
LOG.debug("this is a service resource, continuing ...")
LOG.info("this is a service resource, continuing ...")

extras = self.extras_dict(package_dict['extras'])

Expand Down Expand Up @@ -980,15 +980,15 @@ def remember_error(harvest_object, error_dict):
if 'temporal-extent-end' in extras:
extras['temporal_coverage_to'] = extras['temporal-extent-end']

# LOG.debug("----- data after get_package_dict -----")
# LOG.info("----- data after get_package_dict -----")
# LOG.debug(package_dict)

# extras
package_dict['extras'] = extras_as_list(extras)

return package_dict
else:
LOG.debug('calling get_package_dict on CSWHarvester')
LOG.info('calling get_package_dict on CSWHarvester')
return CSWHarvester.get_package_dict(self, context, data_dict)

@classmethod
Expand Down
6 changes: 3 additions & 3 deletions ckanext/fisbroker/tests/mock_fis_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,12 @@ def do_GET(self):
# response will be served, leading to an error in the harvest job.
# This can be used for tests that somehow involve errored harvest jobs.
record_id = query.get('id')
LOG.debug(f"this is a GetRecordById request: {MockFISBroker.count_get_records}")
LOG.info(f"this is a GetRecordById request: {MockFISBroker.count_get_records}")
if record_id:
record_id = record_id[0]
if record_id not in RESPONSES['records']:
record_id = f"{record_id}_{str(MockFISBroker.count_get_records).rjust(2, '0')}"
LOG.debug(f"looking for {record_id}")
LOG.info(f"looking for {record_id}")
if record_id == "cannot_connect_00":
# mock a timeout happening during a GetRecordById request
raise Timeout()
Expand Down Expand Up @@ -145,7 +145,7 @@ def do_POST(self):
response_content = "<foo></foo>"
if csw_request == "{http://www.opengis.net/cat/csw/2.0.2}GetRecords":
MockFISBroker.count_get_records += 1
LOG.debug(f"this is a GetRecords request: {MockFISBroker.count_get_records}")
LOG.info(f"this is a GetRecords request: {MockFISBroker.count_get_records}")
response_content = RESPONSES['csw_getrecords_01']
response_code = 200
else:
Expand Down
6 changes: 3 additions & 3 deletions ckanext/fisbroker/tests/test_harvester.py
Original file line number Diff line number Diff line change
Expand Up @@ -744,7 +744,7 @@ def test_last_error_free_does_not_return_reimport_job(self, app, base_context):
job_a.status = 'Finished'
job_a.save()

LOG.debug("successful job done ...")
LOG.info("successful job done ...")

# do an unsuccessful job
# This harvest job should fail, because the mock FIS-broker will look for a different
Expand All @@ -758,7 +758,7 @@ def test_last_error_free_does_not_return_reimport_job(self, app, base_context):
job_b.status = 'Finished'
job_b.save()

LOG.debug("unsuccessful job done ...")
LOG.info("unsuccessful job done ...")

# reset the mock server's counter
reset_mock_server(1)
Expand All @@ -771,7 +771,7 @@ def test_last_error_free_does_not_return_reimport_job(self, app, base_context):
extra_environ={'REMOTE_USER': base_context['user'].encode('ascii')}
)

LOG.debug("reimport job done ...")
LOG.info("reimport job done ...")

new_job = self._create_job(source.id)
last_error_free_job = FisbrokerHarvester().last_error_free_job(new_job)
Expand Down

0 comments on commit bd26b66

Please sign in to comment.